Results 1 to 8 of 8
  1. #1
    johnmerlino is offline Member
    Join Date
    May 2014
    Posts
    56
    Rep Power
    0

    Default How to address the relinquish issue with wait()

    I have multiple threads. Both of them send packets to an external firmware device and wait for response. Now let's say in both threads, it is requested for unit 1 to send command 1. I could use the synchronized block to have one thread focus on sending the command and waiting for the response. And then when that thread is finished, then the other thread could then send the command and wait for response. THat would be great. The problem is I need to wait to get a response from the unit and so I invoke wait() to give the unit some time to respond. However, at the same time, by calling wait() the thread has relinquished the intrinsic lock and now that other thread can take control of it. Now when the response comes in from the one command, the receivedRemoteAck will be set to true, and thus both of the waiting threads will be finished. Then when the second responses comes in, there are no threads waiting for it.

    Here is code describing what I am talking about:

    // sending the command
    Java Code:
    public boolean sendRemoteCmd(int ref, DatagramSocket socket) throws InterruptedException{
    		int i;
    		RemoteCommand cmd = remoteCmds.get(ref);
    		
    		synchronized(this){
    			for(i=0;i<RETRIES;i++){			
    					if(receivedRemoteAck){
    					receivedRemoteAck=false;
    					break;
    				} 
    	    		DatagramPacket response = new DatagramPacket(cmd.packet(), cmd.packet().length,
    	    			address(), port());
    	    		try {
    	    			socket.send(response);
    	        	} catch (IOException e) {
    	    			e.printStackTrace();
    	    		}
    
    	        	wait(DEVICE_SEND_TIMEOUT * 1000);
    	        }
    			        
    		}
    		
    		if(i==3){
    			remoteRequestCanceled=true;
    			return false;
    		}
    		return true;
    	}
    // receiving the response:
    Java Code:
    Unit unit = Unit.findByAddress(packet.getAddress());
    			if(unit != null){
    				if(unit.containsRef(msg)){
    					synchronized (unit) {
    						unit.removeCmd(msg);
    					
    						if(!unit.remoteRequestCanceled()){
    							unit.setReceivedCmd(true);						
    							unit.notify();
    						} else {
    							unit.setRemoteRequestCanceled(true);
    						}
    
    					
    					}
    
    				}
    			}

  2. #2
    jim829 is offline Senior Member
    Join Date
    Jan 2013
    Location
    Northern Virginia, United States
    Posts
    6,226
    Rep Power
    14

    Default Re: How to address the relinquish issue with wait()

    I not 100% certain of how you are going about this. But in a UDP connection there is a local port and foreign port, just like TCP. So for each outgoing connection to a server port there is a unique local port. When a packet returns, the underlying com system is supposed to use the local port to demux the packet up to the proper connection. Or are you sharing the same UDP connection in all of your threads? It also seems like you are sharing the same ack flag for all threads.

    Regards,
    Jim
    The JavaTM Tutorials | SSCCE | Java Naming Conventions
    Poor planning on your part does not constitute an emergency on my part

  3. #3
    johnmerlino is offline Member
    Join Date
    May 2014
    Posts
    56
    Rep Power
    0

    Default Re: How to address the relinquish issue with wait()

    The reference number will be different for each response. But the problem is wait() and notify() is being called on the unit object itself. For example, user sends command through console for specific unit. Another user sends same command for same unit through a tcp connection. Now that sendRemoteCmd function is being called twice for the same command and the same unit. yes, they will have different reference numbers. But when I call notify() on that unit when the packet comes in, it will wake up BOTH the console thread and tcp thread for that unit. I need to only wake up the right thread, not both of them.

  4. #4
    jim829 is offline Senior Member
    Join Date
    Jan 2013
    Location
    Northern Virginia, United States
    Posts
    6,226
    Rep Power
    14

    Default Re: How to address the relinquish issue with wait()

    You in your first message:

    Now when the response comes in from the one command, the receivedRemoteAck will be set to true, and thus both of the waiting threads will be finished
    Isn't there a way to have a separate receive ack for each thread and then check to see which one applies to the incoming packet? I guess I would need to see a complete coded example to help you because whatever I come up with probably isn't in line with your actual problem.

    Also, I am not certain what you mean by reference numbers. Are you talking about the local or foreign port from getPort()?

    In additional, even if I send all the packets from a single thread, I can process them without problems based on the local and remote port numbers. And if one is from UDP and the other from TCP then even if the two port numbers are the same, the protocol ids are different. Now if I am running my own protocol over those connections then I may need to manage a certain amount of state for that protocol.

    Caveate: This is how I did many years ago in C when I had to create the IP headers myself. I also did not employ threads. But the demand never warranted it.

    Regards,
    Jim
    Last edited by jim829; 06-18-2014 at 05:34 AM.
    The JavaTM Tutorials | SSCCE | Java Naming Conventions
    Poor planning on your part does not constitute an emergency on my part

  5. #5
    johnmerlino is offline Member
    Join Date
    May 2014
    Posts
    56
    Rep Power
    0

    Default Re: How to address the relinquish issue with wait()

    I created this mock up program to reproduce the issue. It has two classes, ThreadNotifier and Unit:

    ThreadNotifier:
    Java Code:
    import java.util.ArrayList;
    import java.util.Scanner;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class ThreadNotifier {
        private static BlockingQueue<ArrayList<Integer>> queue = new ArrayBlockingQueue<ArrayList<Integer>>(1);
    
    
    	public void udpResponder() throws InterruptedException{
            //Thread.sleep(2000);
    
            System.out.println("Type reference number: ");
            Scanner scanner = new Scanner(System.in);
            String ref = scanner.nextLine(); // ref is unique value
            int ack = 0x1; // in real life ack will be part of packet
            
            // in real life I find unit by address from socket
            Unit unit = Unit.findById(1);
            synchronized (unit){
            	ArrayList<Integer> list = new ArrayList<Integer>();
            	list.add(ack);
            	list.add(Integer.parseInt(ref));        	
            	queue.put(list);
                unit.setCmdReceived(true);
                unit.notify();
            }
            scanner.close();
    
        }
    	
    	public void consoleResponder() throws InterruptedException{
    		Unit unit = Unit.findById(1);
    		unit.sendCmd(1);
    	}
    	
    	public void tcpResponder() throws InterruptedException{
    		Unit unit = Unit.findById(1);
    		unit.sendCmd(1);
    	}
    	
    	
    	public void startService(){	
    		Unit unit = new Unit(1, queue);
    		Unit.addUnit(unit);
    
    		Thread udpResponder = new Thread(new Runnable(){
                public void run() {
                    try {
                        udpResponder();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }           
            });
    
            Thread consoleResponder = new Thread(new Runnable(){
                public void run() {
                    try {
    					consoleResponder();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
     
                }           
            });
    
            Thread tcpResponder = new Thread(new Runnable(){
                public void run() {
                	try {
    					tcpResponder();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
                }           
            });
    
            udpResponder.start();
            consoleResponder.start();
            tcpResponder.start();
    	}
    	
    	public static void main(String[] args) {
    		ThreadNotifier program = new ThreadNotifier();
    		program.startService();
    	}
    
    }
    Unit:
    Java Code:
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.concurrent.BlockingQueue;
    	
    public class Unit {
    	private int id;
    	private static HashMap<Integer, Unit> units = new HashMap<Integer, Unit>();
    	private boolean cmdReceived=false;
    	private final int DEVICE_SEND_TIMEOUT=1000;
    	BlockingQueue<ArrayList<Integer>> queue;
        
        public Unit(int id, BlockingQueue<ArrayList<Integer>> queue){
        	this.id = id;
        	this.queue = queue;
        }
        
        public int id(){
        	return id;
        }
        
    	public void sendCmd(int cmd) throws InterruptedException{
    		int i;
    		ArrayList<Integer> unitResp = new ArrayList<Integer>();
    		
    		// sending command over udp socket
    		
    		
    		// waiting for response
    		synchronized(this){
    			for(i=0;i<3;i++){
    				if(cmdReceived){
    					unitResp = queue.take();
    					break;
    				}
    				wait(DEVICE_SEND_TIMEOUT);
    			}
    		}
    
    		if(i==3){
    			System.out.println("Unit did not respond.");
    		} else {
    			System.out.println("Unit ACK " + unitResp.get(0) + " REF" + unitResp.get(1));
    		}
    		
    	}
    	
    	public void setCmdReceived(boolean cmdReceived){
    		this.cmdReceived = cmdReceived;
    	}
    	
        public static void addUnit(Unit unit){
        	units.put(unit.id(), unit);
        }
        
        public static Unit findById(int id){
        	return units.get(id);
        }
    }
    If you place the two classes in a new Java Project in Eclipse and then press run, you will see a little message in console saying "Type reference number:". In reality, that will be the actual physical firmware sending a udp packet via gsm to the java server, which contains a unique reference number.

    When I press a number like 1 in the console, you will get a message that looks like this:
    Unit ACK 1 REF1

    And that is it. That's exactly the problem. You should also get a second message that says "Unit did not respond." because the two threads consoleResponder and tcpResponder will executed at same time, one of the threads responded and the other thread is in some limbo state. I have no idea what it is doing.

    Ultimately, i need to associate the reference number being sent in the packet with the thread that it was sent from. If I can do that, then I can do soemthing like this in the sendCmd method:

    Java Code:
    synchronized(this){
    			for(i=0;i<3;i++){
    				if(cmdReceived){
    					unitResp = queue.take();
    					
    					if(unitResp.get(0) == referenceNumber){
    						break;
    					} else {
    						cmdReceived=false;
    					}
    					
    				}
    				wait(DEVICE_SEND_TIMEOUT);
    			}
    		}
    Last edited by johnmerlino; 06-18-2014 at 08:57 AM.

  6. #6
    jim829 is offline Senior Member
    Join Date
    Jan 2013
    Location
    Northern Virginia, United States
    Posts
    6,226
    Rep Power
    14

    Default Re: How to address the relinquish issue with wait()

    John,

    I figured it out. I tried some stuff and had to go back to the documentation. The take() method of the BlockingQueue() blocks (duh). So it just hangs there waiting for the queue to receive another entry. Whichever thread removed the entry first, the other one blocks.

    Regards,
    Jim
    The JavaTM Tutorials | SSCCE | Java Naming Conventions
    Poor planning on your part does not constitute an emergency on my part

  7. #7
    johnmerlino is offline Member
    Join Date
    May 2014
    Posts
    56
    Rep Power
    0

    Default Re: How to address the relinquish issue with wait()

    Quote Originally Posted by jim829 View Post
    John,

    I figured it out. I tried some stuff and had to go back to the documentation. The take() method of the BlockingQueue() blocks (duh). So it just hangs there waiting for the queue to receive another entry. Whichever thread removed the entry first, the other one blocks.

    Regards,
    Jim
    I completely removed the blocking queue functionality and instead of synchronizing the unit object, I am now synchronizing the specific remote command object. Since each remote command will be unique, I can notify an individual remote command object. Here is a dummy example:

    // ThreadNotifer.java
    Java Code:
    import java.util.Scanner;
    
    public class ThreadNotifier {
    
    	public void udpResponder() throws InterruptedException{
    		int ack=0;
    		
    		// in our test, the ref numbers increment by 1, starting from 1
    		// so press 1 and then press 2 in the prompt!
    		Scanner scanner = new Scanner(System.in);
    		while(true){
    			System.out.println("Type reference number: ");
    			int ref = scanner.nextInt();			
    			ack++; // in real life ack will be part of packet
    						
    	        // in real life I find unit by address from socket
    	        Unit unit = Unit.findById(1);
    	        
    	        // this may return null sometimes, since unit will remove remoteCmd
    	        // once it has cancelled the request
    	        RemoteCommand remoteCmd = unit.findRemoteCmd(ref);
    	        
    	        if(remoteCmd != null && !remoteCmd.cancelled()){
    		        synchronized (remoteCmd){
    		        	remoteCmd.setAck(ack);
    		        	remoteCmd.received(true);
    		        	remoteCmd.notify();
    		        }
    	        }
    	        // scanner.close();
    		}
        }
    	
    	public void consoleResponder() throws InterruptedException{
    		Unit unit = Unit.findById(1);
    		unit.sendCmd(1);
    	}
    	
    	public void tcpResponder() throws InterruptedException{
    		Unit unit = Unit.findById(1);
    		unit.sendCmd(1);
    	}
    	
    	
    	public void startService(){	
    		Unit unit = new Unit(1);
    		Unit.addUnit(unit);
    
    		Thread udpResponder = new Thread(new Runnable(){
                public void run() {
                    try {
                        udpResponder();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }           
            });
    
            Thread consoleResponder = new Thread(new Runnable(){
                public void run() {
                    try {
    					consoleResponder();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
     
                }           
            });
    
            Thread tcpResponder = new Thread(new Runnable(){
                public void run() {
                	try {
    					tcpResponder();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
                }           
            });
    
            udpResponder.start();
            consoleResponder.start();
            tcpResponder.start();
    	}
    	
    	public static void main(String[] args) {
    		ThreadNotifier program = new ThreadNotifier();
    		program.startService();
    	}
    
    }
    Unit.java
    Java Code:
    import java.util.HashMap;
    	
    public class Unit {
    	private int id;
    	private int ref=0;
    	
    	private HashMap<Integer, RemoteCommand> cmds=new HashMap<Integer, RemoteCommand>();;
    	private static HashMap<Integer, Unit> units = new HashMap<Integer, Unit>();
    	
        public Unit(int id){
        	this.id = id;
        }
        
        public int id(){
        	return id;
        }
        
        private int nextRef(){    	
        	return ++ref;
        }
     
    	public void sendCmd(int cmd) throws InterruptedException{
    		RemoteCommand remoteCmd = new RemoteCommand(nextRef(), cmd, this);
    		if(addRemoteCmd(remoteCmd)){
    			sendRemoteCmd(remoteCmd);
    		}		
    	}
    	
    	public boolean addRemoteCmd(RemoteCommand remoteCmd){
    		if(cmds.get(remoteCmd.ref()) != null){
    			System.out.println("Already sending command to device with this ref number...");
    			return false;
    		}
    		cmds.put(remoteCmd.ref(), remoteCmd);
    		return true;
    	}
    
    	public void sendRemoteCmd(RemoteCommand remoteCmd) throws InterruptedException{
    		remoteCmd.send();
    		removeRemoteCmd(remoteCmd);
    	}
    	
    	// might not want to remove the cmd since notifier thread will try
    	// to find it when packet comes in over udp socket
    	public void removeRemoteCmd(RemoteCommand remoteCmd){
    		cmds.remove(remoteCmd);
    	}
    	
    	public RemoteCommand findRemoteCmd(int ref){
    		return cmds.get(ref);
    	}
    	
    	
    		
        public static void addUnit(Unit unit){
        	units.put(unit.id(), unit);
        }
        
        public static Unit findById(int id){
        	return units.get(id);
        }
    }
    RemoteCommand.java

    Java Code:
    public class RemoteCommand {
    	private int ref;
    	private int ack;
    	private int cmd;
    	private Unit parent;
    	private boolean received=false;
    	private boolean cancelled=false;
    	private final int MAX_TRIES=3;
    	private final int DEVICE_SEND_TIMEOUT=5;
    	
    	public RemoteCommand(int ref, int cmd, Unit parent){
    		this.ref = ref;
    		this.cmd = cmd;
    		this.parent = parent;
    	}
    	
    	public int ref(){
    		return ref;
    	}
    	
    	public int ack(){
    		return ack;
    	}
    	
    	public void setAck(int ack){
    		this.ack = ack;
    	}
    	
    	public Unit parent(){
    		return parent;
    	}
    	
    	public int cmd(){
    		return cmd;
    	}
    	
    	public boolean received(){
    		return received;
    	}
    	
    	public void received(boolean received){
    		this.received = received;
    	}
    	
    	public boolean cancelled(){
    		return cancelled;
    	}
    	
    	public void cancelled(boolean cancelled){
    		this.cancelled = cancelled;
    	}
    	
    	// received and canceled are how two threads know each other's status of same object.
    	// received indicates to waiting thread that request was successful
    	// cancelled indicates to notifier thread that waiting thread stopped waiting
    	public void send() throws InterruptedException{
    		int i;
    		
    		synchronized(this){
    			// simulate sending command over udp
    			
    			for(i=0;i<MAX_TRIES;i++){
    				if(received()){
    					break;
    				}
    				try {
    					wait(DEVICE_SEND_TIMEOUT*1000);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			
    			if(i==3){
    				System.out.println("Unit did not respond in adequate time. Now cancelling request...");
    				// if response eventually comes in from unit
    				// inform UdpResponder thread we stopped waiting
    				cancelled(true);
    			} else {
    				System.out.println("Unit ACK " + ack() + " REF" +  ref());
    			}
    		}
    	}
    }
    What do you think?

  8. #8
    jim829 is offline Senior Member
    Join Date
    Jan 2013
    Location
    Northern Virginia, United States
    Posts
    6,226
    Rep Power
    14

    Default Re: How to address the relinquish issue with wait()

    Quote Originally Posted by johnmerlino View Post
    What do you think?
    I didn't expect you to rewrite it but just alter the way the use the queue (perhaps using peek() or poll() methods instead of the take() method).

    But more importantly, does it work like you want? Does it do what you expect? I probably wouldn't do it your way. And someone else probably wouldn't do it the way I do it. As long as it works and you understand what is going on, then that's fine. You may learn additional stuff later on which may help you improve it (assuming it needs improving) or apply it to similar efforts in the future.

    Regards,
    Jim
    The JavaTM Tutorials | SSCCE | Java Naming Conventions
    Poor planning on your part does not constitute an emergency on my part

Similar Threads

  1. What is the right way to wait for a class?
    By uthred in forum New To Java
    Replies: 3
    Last Post: 08-09-2013, 12:45 PM
  2. Best way to wait for a message
    By Axephilic in forum Advanced Java
    Replies: 5
    Last Post: 12-06-2011, 06:16 PM
  3. Wait() Question
    By rsvr in forum Threads and Synchronization
    Replies: 3
    Last Post: 04-27-2010, 04:39 PM
  4. Need help with wait() and notify()
    By Mkaveli in forum Threads and Synchronization
    Replies: 2
    Last Post: 03-30-2010, 12:58 PM
  5. How to use sleep() to wait for a while
    By Java Tip in forum java.lang
    Replies: 0
    Last Post: 04-09-2008, 07:32 PM

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •