Results 1 to 4 of 4
  1. #1
    rushhour is offline Member
    Join Date
    Nov 2010
    Posts
    8
    Rep Power
    0

    Default Producer Consumer Synchronization Problem

    Hi everyone,

    I am having a little problem with one of my programs which uses Threads, ThreadGroups and Java Monitors/Synchronization, which is a producer consumer problem.

    I have a Circular Buffer class, which puts and takes items. I also have a Shared Buffer class which extends this Circular Buffer, which uses the synchronized keyword. I also have a Producer Class which produces the number of items requested at creation and a Consumer Class which retrieves items from the Shared Buffer. I have also created a ThreadManager Class, which creates an instance of the Shared Buffer, creates 2 Producer ThreadGroups with different maximum priorities, 4 Producer threads, one Consumer ThreadGroup and 3 Consumer threads.

    My problem is that all Producers and Consumers start, but get stuck on waiting to produce, and all consumers are only able to retrieve one item. How can I get them all to produce? My code is shown below:

    Java Code:
    /*
     * File: CircularBuffer.java
     * implements the Buffer interface
     */
    public class CircularBuffer3 implements Buffer
    {
    	private ModuloCounter in ;
    	private ModuloCounter out ;
    	
    	private int counter ;
    	private Object bufferData[];
    	
    	private int userBufferSize;
    	private Object value = null;
    	
    	public CircularBuffer3()
    	{
    		this( BUFFER_SIZE );
    	}
    
    	public CircularBuffer3( int bufferSizeValue )
    	{
    		counter = 0;
    		userBufferSize = bufferSizeValue;
    		in = new ModuloCounter( userBufferSize );
    		out = new ModuloCounter( userBufferSize );
    		bufferData = new Object [userBufferSize];
    		
    		for ( int i = 0; i < userBufferSize; i++ )
    		{
    			bufferData[i] = null;
    		}
    	}
    	
    	public void put( Object val )
    	{
    		if ( !isFull() )
    		{
    			value = val;
    			bufferData[in.value()] = val;
    			in.next();
    			counter++;
    		}
    	}
    	
    	public Object take()
    	{
    		if ( !isEmpty())
    		{
    			bufferData[out.value()] = null;
    			out.next();
    			counter--;
    		}
    		return value;
    	}
    	
    	public boolean isEmpty()
    	{
    		/*
    		if ( counter == 0 )
    		{
    			System.out.println( "buffer is empty" );
    			return true;
    		}
    		else
    		{
    			return false;
    		}
    		*/
    		
    		return ( counter == 0 );
    	}
    	
    	public boolean isFull()
    	{
    		if ( counter == userBufferSize )
    		{
    			return true;
    		}
    		else
    		{
    			return false;
    		}
    	}
    	
    	public int getBufferSize()
    	{
    		return userBufferSize;
    	}
    	
    	public String toString()
    	{
    		String output = "";
    		String bufferContent = "";
    		
    		for ( int i = 0; i < userBufferSize; i++ )
    		{
    			if ( bufferData[i] == null )
    			{
    				bufferContent += " -";
    			}
    			else
    			{
    				bufferContent += " *";
    			}
    		}
    		
    		output += "[COUNT = " + counter + "[" + userBufferSize + "]" + " | IN = " + ( in.value()) + " | OUT = " + ( out.value() ) + " | <" + bufferContent + " >]";
    		
    		return output;
    	}
    }
    Java Code:
    /*
     * File: SharedBuffer.java
     * extends the CircularBuffer class and is a monitor
     */
    
    public class SharedBuffer extends CircularBuffer3
    {
    	private boolean available = true;
    	private boolean notAvailable = false;
    	private int counter ;
    	private Object value ;
    		
    		
    		
    	public SharedBuffer( int bsv )
    	{
    		super( bsv ) ;
    	}
    	
    	public synchronized void put( Object val )
    	{
    		while( !available )
    		{
    			try
    			{
    				System.out.println(" Waiting to Produce ");
    				wait( ) ;
    			}
    			catch ( InterruptedException e )
    			{
    				
    			}
    		}
    		
    		value = val ;
    		super.put( val ) ;
    		notAvailable = true ;
    		counter++ ;
    		
    		//System.out.println( "SharedBuffer.put( ) : " + super.toString( ) ) ;
    		
    		notifyAll( ) ;
    	}
    	
    	public synchronized Object take( )
    	{
    		while( !notAvailable )
    		{
    			try
    			{
    				System.out.println( "Waiting to consume" );
    				wait( ) ;
    			}
    			catch ( InterruptedException e )
    			{
    				
    			}
    		}
    		
    		available = false ;
    		notifyAll( ) ;
    		super.take( ) ;
    		counter-- ;
    		//System.out.println( "SharedBuffer.take( ) : " + super.toString( ) ) ;
    		return value ;
    	}
    }
    Java Code:
    /*
     * File: ProducerClass.java
     * This will produce a number of items at creation
     */
    
    public class ProducerClass extends Thread
    {
    	private SharedBuffer sb ;
    	private int items ;
    	private ThreadGroup tg ;
    	private int seconds ;
    	
    	public ProducerClass( String id, SharedBuffer sb, int items, ThreadGroup tg, int seconds )
    	{
    		super( id ) ;
    		this.sb = sb ;
    		this.items = items ;
    		this.tg = tg ;
    	}
    
    	public void run( )
    	{
    		System.out.println( getName( ) + " Thread Started" ) ;
    		
    		for ( int i = 0; i < items; i++ )
    		{
    			try
    			{
    				sleep( ( int ) ( Math.random( ) * ( seconds * 1000 ) ) ) ;
    			}
    			catch ( InterruptedException e )
    			{
    				
    			}
    			
    			ThreadDescriptor td = new ThreadDescriptor( this, tg ) ;
    				
    			sb.put( i ) ;
    			
    			System.out.println( getName( ) + ".put( ) : " + td.toString ( ) ) ;
    		}
    		
    		System.out.println( getName( ) + " managed to Produce " + items + " items." ) ;
    	}
    }
    Java Code:
    /*
     * File: ConsumerClass.java
     * This will retrieve items from the shared buffer
     */
    
    public class ConsumerClass extends Thread
    {
    	private SharedBuffer sb ;
    	private ThreadGroup tg ;
    	private int items = 0 ;
    
    	public ConsumerClass( String id, SharedBuffer sb, ThreadGroup tg )
    	{
    		super( id ) ;
    		this.sb = sb ;
    		this.tg = tg ;
    	}
    
    	public void run( )
    	{
    		System.out.println( getName( ) + " Thread Started" ) ;
    		
    		System.out.println( getName()+ ".take( ) : " + sb.take( ) );
    		yield();
    		items++ ;
    		System.out.println( getName( ) + " managed to retrieve " + items + " items." ) ;
    	}
    }
    Java Code:
    /*
     * File: Threadmanager.java
     * This will create and co-ordinate ThreadGroups
     */
    
    public class ThreadManager
    {
    	public static void main( String[] args )
    	{
    		// create a SharedBuffer Object to hold 5 items
    		SharedBuffer sb = new SharedBuffer( 5 ) ;
    		
    		// Create two Producer ThreadGroups, one for high priority, one for low priority 
    		ThreadGroup prodHighPri = new ThreadGroup( "High Priority Producer" ) ;
    		ThreadGroup prodLowPri = new ThreadGroup( "Low Priority Producer" ) ;
    		
    		// Set the max priority of the above ThreadGroups to 8 and 3 ( high and low ) 
    		prodHighPri.setMaxPriority( 8 ) ;
    		prodLowPri.setMaxPriority( 3 ) ;
    		
    		// create 4 Producer threads ( producer name, SharedBuffer, items, ThreadGroup, sleep time )
    		ProducerClass producer1 = new ProducerClass( "Producer 1", sb, 5, prodHighPri, 3 ) ;
    		ProducerClass producer2 = new ProducerClass( "Producer 2", sb, 5, prodHighPri, 3 ) ;
    		ProducerClass producer3 = new ProducerClass( "Producer 3", sb, 5, prodLowPri, 3 ) ;
    		ProducerClass producer4 = new ProducerClass( "Producer 4", sb, 5, prodLowPri, 3 ) ;
    		
    		// Create one Consumer ThreadGroup 
    		ThreadGroup consumerThreadGroup = new ThreadGroup( "Consumer Thread Group" ) ;
    		
    		// Create 3 Consumer threads which retrieve items from the SharedBuffer
    		ConsumerClass consumer1 = new ConsumerClass( "Consumer 1", sb, consumerThreadGroup ) ;
    		ConsumerClass consumer2 = new ConsumerClass( "Consumer 2", sb, consumerThreadGroup ) ;
    		ConsumerClass consumer3 = new ConsumerClass( "Consumer 3", sb, consumerThreadGroup ) ;
    		ConsumerClass consumer4 = new ConsumerClass( "Consumer 4", sb, consumerThreadGroup ) ;
    		// Start all Producer Threads
    		producer1.start( ) ;
    		producer2.start( ) ;
    		producer3.start( ) ;
    		producer4.start( ) ;
    		
    		// Start all Consumer Threads
    		consumer1.start( ) ;
    		consumer2.start( ) ;
    		consumer3.start( ) ;
    		consumer4.start( ) ;
    	}
    }
    Sample output looks like this:
    Java Code:
    Producer 2 Thread Started
    Producer 1 Thread Started
    Producer 3 Thread Started
    Consumer 1 Thread Started
    Waiting to consume
    Consumer 3 Thread Started
    Waiting to consume
    Producer 4 Thread Started
    Consumer 2 Thread Started
    Waiting to consume
    Consumer 4 Thread Started
    Waiting to consume
    Producer 1.put( ) : [ID = Producer 1 | GRP = High Priority Producer | PRI = 8 | VN = 1]
     Waiting to Produce 
    Producer 4.put( ) : [ID = Producer 4 | GRP = Low Priority Producer | PRI = 3 | VN = 1]
    Consumer 2.take( ) : 0
     Waiting to Produce 
     Waiting to Produce 
    Consumer 2 managed to retrieve 1 items.
    Producer 3.put( ) : [ID = Producer 3 | GRP = Low Priority Producer | PRI = 3 | VN = 1]
     Waiting to Produce 
    Consumer 1.take( ) : 0
     Waiting to Produce 
    Consumer 3.take( ) : 0
    Consumer 1 managed to retrieve 1 items.
    Consumer 3 managed to retrieve 1 items.
     Waiting to Produce 
     Waiting to Produce 
    Consumer 4.take( ) : 0
    Consumer 4 managed to retrieve 1 items.
    Hopefully you can help me solve my problem.

  2. #2
    eRaaaa is offline Senior Member
    Join Date
    Oct 2010
    Location
    Germany
    Posts
    787
    Rep Power
    5

    Default

    Uff, too much code :)
    But. have you considered the java.util.concurrent package?
    The BlockingQueue implementations are designed to be used primarily for producer-consumer queues, perhaps it will help you.

  3. #3
    rushhour is offline Member
    Join Date
    Nov 2010
    Posts
    8
    Rep Power
    0

    Default

    Thanks for your reply,

    but I am supposed to create my own Producer/Consumer solution, and specifically have to implement it using a ThreadManager class and a Producer Class, Consumer Class and Shared Buffer.

    Why would it be stuck on waiting to produce?

    Sorry for the long code posting!
    Last edited by rushhour; 11-23-2010 at 06:21 PM.

  4. #4
    JosAH's Avatar
    JosAH is online now Moderator
    Join Date
    Sep 2008
    Location
    Voorschoten, the Netherlands
    Posts
    13,344
    Blog Entries
    7
    Rep Power
    20

    Default

    Oh dear, all that code while producers, consumers and the shared resourse (your circular buffer) are so simple:

    A producer has to wait if the buffer is full and a consumer has to wait as long as the buffer is empty. The buffer is the shared resource so both the producer(s) and consumer(s) have to synchronize on it. Both the producer(s) and consumer(s) have to have a reference to the (shared) buffer. For the producer this leads to the following:

    Java Code:
    void produce(Product p) {
       synchronize (buffer) {
          while (buffer.isFull())
             buffer.wait();
          buffer.put(p);
          buffer.notifyAll();
       }
    }
    For the consumer the (almost) identical code goes like this:

    Java Code:
    Product consume()) {
       synchronize (buffer) {
          while (buffer.isEmpty())
             buffer.wait();
          Product p= buffer.get);
          buffer.notifyAll();
          return p;
       }
    }
    Both the producer(s) and consumer(s) notify everything waiting on that shared resource because something was either added to the buffer (the consumers want that) or something was removed from that buffer (the producers like that). Note that while statement in both pieces of code: the buffer may be full again for the producer(s) or empty again for the consumer(s).

    kind regards,

    Jos
    Last edited by JosAH; 11-23-2010 at 07:46 PM.
    cenosillicaphobia: the fear for an empty beer glass

Similar Threads

  1. Synchronization Problem
    By T3X4S in forum New To Java
    Replies: 0
    Last Post: 09-17-2010, 05:35 AM
  2. Threads and Synchronization
    By ASADUN in forum Threads and Synchronization
    Replies: 4
    Last Post: 12-18-2009, 07:00 AM
  3. Towers of Hanoi using Producer/Consumer Pattern
    By dooey in forum New To Java
    Replies: 0
    Last Post: 09-08-2009, 12:45 PM
  4. One Producer - Many Consumers - Same Message
    By zhackwyatt in forum Threads and Synchronization
    Replies: 1
    Last Post: 04-23-2008, 08:27 PM
  5. Replies: 0
    Last Post: 04-09-2008, 06:41 PM

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •