Results 1 to 4 of 4
  1. #1
    johnmerlino is offline Member
    Join Date
    May 2014
    Posts
    56
    Rep Power
    0

    Default Implementing a thread-safe queue

    I have situation where a user can request java server to send a value to an embedded device, and if the device is asleep, that value needs to be stored in a queue until the device wakes up and sends a position to java server, at which point the server checks if there is a value in the queue and if there is, it then sends that value to the device. The maximum size of the queue is 1 for now. And if the user makes continuous requests to java server, the old value is removed and the new value is added.

    Initially I was looking into BlockingQueue, but the problem with that is, well, it blocks. queue.put(value) will block if queue is full, and queue.take() will block if queue is empty. I can't have something that blocks. When the device responds to server, server checks if value is in queue, if it is not then the server carries on the rest of its responsibility. Thus, I then entertained ConcurrentLinkedQueue. While queue.offer(value) and queue.poll(); allow you to add and remove values respectively from the queue without blocking, it does not allow you to set a maximum size limit of the queue. My queue must have a maximum size limit and it has to always be the newest value the user submits to the queue (where old values are removed).

    So this is what I came up with:

    Java Code:
    class Unit {
            private List<Integer> cmdQueue;
    
            public Unit(){
        	    cmdQueue = Collections.synchronizedList(new LinkedList<Integer>());
           }
    
    	public void queueCmd(int cmd){
    		synchronized(this){
    			cmdQueue.clear();
    			cmdQueue.add(cmd);
    		}
    	}
    	
    	public int removeQueuedCmd(){
    		if(cmdQueue.size() > 0){
    			return cmdQueue.remove(cmdQueue.size()-1);
    		} else {
    			return -1;
    		}
    	}
    }
    My question is if I use Collections.synchronizedList, as I do here, do I still need to use synchronize as I did above?

  2. #2
    jim829 is offline Senior Member
    Join Date
    Jan 2013
    Location
    Northern Virginia, United States
    Posts
    4,023
    Rep Power
    6

    Default Re: Implementing a thread-safe queue

    Your description of what you want is a little contradictory (or I am not understanding). You said you can't use BlockingQueue because it blocks. Yet you want a queue which allows a maximum size. What happens when the max size is reached and something needs to be added to the queue? You either block and wait to add it or don't block and it doesn't get added. The BlockingQueue allows you to offer something to the queue and returns true if it was added or false if wasn't (because it would block). There is also a remainingCapacity method to let you determine how much more the queue can hold before it will block.

    Regards,
    Jim
    The JavaTM Tutorials | SSCCE | Java Naming Conventions
    Poor planning on your part does not constitute an emergency on my part

  3. #3
    johnmerlino is offline Member
    Join Date
    May 2014
    Posts
    56
    Rep Power
    0

    Default Re: Implementing a thread-safe queue

    Quote Originally Posted by jim829 View Post
    What happens when the max size is reached and something needs to be added to the queue?
    The maximum size will be 1 and the queued is first emptied before a new item is added to the queue. As shown in this working example:

    Java Code:
    import java.util.Collections;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Scanner;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    
    public class CommandQueueTest {
    
    	private List<Integer> cmdQueue;
    	int counter=0;
    	private static final int SET_INTERVAL=10000;
    	
    	private void addQueuedCmd(){
    		synchronized(this){
    			System.out.println("Adding to queue");
    			cmdQueue.clear();
    			cmdQueue.add(counter++);
    		}
    	}
    	
    	public int removeQueuedCmd(){
    		if(cmdQueue.size() > 0){
    			return cmdQueue.remove(cmdQueue.size()-1);
    		} else {
    			return -1;
    		}
    	}
    	
    	private void runTest(){
    		cmdQueue = Collections.synchronizedList(new LinkedList<Integer>());
    		
    		ScheduledExecutorService pollQueue = Executors.newScheduledThreadPool(1);
    		Runnable queue = new Runnable() {
    	        public void run() {
    	        	addQueuedCmd();
    	        }
    	    };
    	    pollQueue.scheduleAtFixedRate(queue,
    	    		SET_INTERVAL, 
    	    		SET_INTERVAL, 
    	            TimeUnit.MILLISECONDS);
    	    
    	        @SuppressWarnings("resource")
    		Scanner scanner = new Scanner(System.in);
    	    while(true){
    	    	
    	    	System.out.println("Get command from queue: ");
    	    	scanner.nextInt(); // just type any number so that the program removes item from queue
    	    	int cmd=-1;
    	    	if((cmd = removeQueuedCmd()) > -1){
    	    		System.out.println("Send cmd: " + cmd);
    	    		System.out.println("Queue should be empty: " + cmdQueue.size());
    	    	} else {
    	    		System.out.println("No commands in queue.");
    	    	}
    	    }
    
    	}
    	
    	public static void main(String[] args) {
    		CommandQueueTest test = new CommandQueueTest();
    		test.runTest();
    	}
    
    }
    As you can see when an item is added to queue, queue is first emptied so there will be at most one item. Now when item is added to queue and you go to remove an item, then item is removed from queue and queue is again empty until the other thread adds an item to queue again. This is exactly the behavior I wanted and I couldn't acheive with the concurrent blockingqueue interface or ConcurrentLinkedQueue because the former blocks and the latter doesn't permit a size limit. Hence, I had to roll out my own here. I was wondering why such a functionality doesn't exist in the concurrent packages of Java 8.

  4. #4
    jim829 is offline Senior Member
    Join Date
    Jan 2013
    Location
    Northern Virginia, United States
    Posts
    4,023
    Rep Power
    6

    Default Re: Implementing a thread-safe queue

    Quote Originally Posted by johnmerlino View Post
    Hence, I had to roll out my own here. I was wondering why such a functionality doesn't exist in the concurrent packages of Java 8.
    It does! I just did the same thing using an implementation of blocking queue and some of the methods like isEmpty, poll, etc. But with a queue size of only one, which is what you said, why do you need a queue? Why not just use an object reference. Every time you add an item you clear the queue anyway.

    Regards,
    Jim
    Last edited by jim829; 07-10-2014 at 09:41 PM.
    The JavaTM Tutorials | SSCCE | Java Naming Conventions
    Poor planning on your part does not constitute an emergency on my part

Similar Threads

  1. Replies: 9
    Last Post: 04-03-2014, 08:35 PM
  2. how to is implementing both stack and queue?
    By noor22 in forum New To Java
    Replies: 3
    Last Post: 10-31-2012, 07:06 PM
  3. Is this thread safe?
    By veronique in forum New To Java
    Replies: 7
    Last Post: 07-28-2011, 08:28 PM
  4. Thread safe without using synchronization
    By swetu.vc in forum Threads and Synchronization
    Replies: 3
    Last Post: 01-20-2010, 09:06 AM

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •