3

I wrote a simple consumer-producer problem with a blocking queue that has multiple producers and multiple consumers take and putting integers on the queue. However, when I tried to testing it, the results aren't as desired such as the size of the queue is not correct. I don't think the consumer and producer size are syncing together. Moreover, I put a 2 second sleep on both producer and consumer but when testing, every two seconds it prints out the results of 2 producers and 2 consumers. Does anyone know what I am doing wrong? Maybe I am starting the threads wrong? I commented out another way I did it but the results were still wrong.

Results:

run: Producing 425 Thread-0 size left 0 Consuming 890 Thread-3 size left 0 Consuming 425 Thread-2 size left 0 Producing 890 Thread-1 size left 0 Consuming 192 Thread-2 size left 0 Consuming 155 Thread-3 size left 0 Producing 155 Thread-1 size left 0 Producing 192 Thread-0 size left 0 Consuming 141 Thread-2 size left 1 Producing 141 Thread-0 size left 0 Producing 919 Thread-1 size left 0 Consuming 919 Thread-3 size left 0 Producing 361 Thread-1 size left 0 Producing 518 Thread-0 size left 0 Consuming 518 Thread-3 size left 0 Consuming 361 Thread-2 size left 0 Producing 350 Thread-0 size left 1 Consuming 350 Thread-3 size left 0 Consuming 767 Thread-2 size left 0 Producing 767 Thread-1 size left 0 

Producer

import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; public class Producer implements Runnable { BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); public Producer(BlockingQueue<Integer> q) { this.items = q; } private int generateRandomNumber(int start, int end) { Random rand = new Random(); int number = start + rand.nextInt(end - start + 1); return number; } public void run() { for (int i = 0; i < 5; i++) { int rand = generateRandomNumber(100, 1000); try { items.put(rand); System.out.println("Producing " + rand + " " + Thread.currentThread().getName() + " size left " + items.size()); Thread.sleep(3000); } catch (InterruptedException ex) { Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex); } } } } 

Consumer

import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; public class Consumer implements Runnable { BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); public Consumer(BlockingQueue<Integer> q) { this.items = q; } public void run() { while (true) { try { System.out.println("Consuming " + items.take() + " " + Thread.currentThread().getName() + " size left " + items.size()); Thread.sleep(3000); } catch (InterruptedException ex) { Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex); } } } } 

Test

import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ProducerConsumer { public static void main(String args[]) { BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); Producer producer = new Producer(items); Consumer consumer = new Consumer(items); Thread t1 = new Thread(producer); Thread t2 = new Thread(producer); Thread t3 = new Thread(consumer); Thread t4 = new Thread(consumer); /* Thread t1 = new Thread(new Producer()); Thread t2 = new Thread(new Producer()); Thread t3 = new Thread(new Consumer()); Thread t4 = new Thread(new Consumer()); */ t1.start(); t2.start(); t3.start(); t4.start(); } } 

UPDATE: I tried to implement the reentrant lock but my program stops at the lock line. Any help? Consumer

import java.util.concurrent.locks.ReentrantReadWriteLock; public class Consumer implements Runnable { //private BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); private MyBlockingQ items; public Consumer(MyBlockingQ q) { this.items = q; } public void run() { while (true) { items.remove(); //Thread.sleep(1000); } } } 

Producer

import java.util.Random; public class Producer implements Runnable { private MyBlockingQ items; public Producer(MyBlockingQ q) { this.items = q; } private int generateRandomNumber(int start, int end) { Random rand = new Random(); int number = start + rand.nextInt(end - start + 1); return number; } public void run() { for (int i = 0; i < 5; i++) { int rand = generateRandomNumber(100, 1000); items.add(rand); } } } 

MyBlockingQ (shared resouce)

import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; import java.util.logging.Logger; public class MyBlockingQ { private BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public MyBlockingQ() { } public void add(Integer i) { lock.writeLock().lock(); try { items.put(i); System.out.println("Producing " + i + " " + Thread.currentThread().getName() + " size left " + items.size()); } catch (InterruptedException ex) { Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex); } finally { lock.writeLock().unlock(); } } public void remove() { lock.writeLock().lock(); try { int taken = items.take(); System.out.println("Consuming " + taken + " " + Thread.currentThread().getName() + " size left " + items.size()); } catch (InterruptedException ex) { Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex); } finally { lock.writeLock().unlock(); } } } 

Test

import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ProducerConsumer { public static void main(String args[]) { MyBlockingQ items = new MyBlockingQ(); System.out.println("starting"); Thread t1 = new Thread(new Producer(items)); Thread t2 = new Thread(new Producer(items)); Thread t3 = new Thread(new Consumer(items)); Thread t4 = new Thread(new Consumer(items)); t1.start(); t2.start(); t3.start(); t4.start(); } } 
1
  • in you remove() method in MyBlockingQ you are locking the writeLock(), while you should be locking the readLock() Commented Oct 10, 2012 at 7:08

3 Answers 3

5

These two lines

items.put(rand); System.out.println("Producing " + rand + " " + Thread.currentThread().getName() + " size left " + items.size()); 

are not synchronized. The producer might put numbers in the queue, but when the size of the queue is displayed from the thread which put in it, the consumer might have already consumed a number.

Sign up to request clarification or add additional context in comments.

Comments

3

You are probably confused by this parts of output:

Producing 425 Thread-0 size left 0 Consuming 890 Thread-3 size left 0 Consuming 425 Thread-2 size left 0 Producing 890 Thread-1 size left 0 

Question: How come Thread-3 is consuming 890 items, before Thread-1 produces them?

Answer: Thread-3 is not consuming items, before they were produced, by Thread-1.

Why: When Thread-1 is putting the items to the Queue, Thread-3 is probably already waiting for items to take from the Queue. So Thread-1 puts the items:

items.put(rand); 

And BEFORE Thread-1 hops into the next line and prints the info about the items it produced Thread-3 executes following line:

System.out.println("Consuming " + items.take() + " " + Thread.currentThread().getName() + " size left " + items.size()); 

Only then Thread-1 executes its println:

System.out.println("Producing " + rand + " " + Thread.currentThread().getName() + " size left " + items.size()); 

Because of this you can see these funny results in the console.

You might want to read about synchronizing. There are 2 ways to solve your problem:

  • synchronized methods
  • synchronized statements (approach used by brimborium)

Synchronization locks the access to the object(s) that are inside the synchronized block. That means that every other method, has to wait for its turn, before it can access the object(s).

So if you use synchronization on items in both Producer and Consumer then:

  • Consumer cannot take items when Producer is putting them.
  • Producer cannot put items when Consumer is taking them.

I case when items is empty and Consumer's method locks the items, the program will fall into so called deadlock. Producer has to wait for the Consumer to unlock, but it will never happen, since Consumer is waiting to take items (which have to be placed there by Producer).

Moreover, I put a 2 second sleep on both producer and consumer but when testing, every two seconds it prints out the results of 2 producers and 2 consumers.

This is exacly what you should expect. In the Test class you are making 2 producers and 2 consumers.

Thread t1 = new Thread(producer); Thread t2 = new Thread(producer); Thread t3 = new Thread(consumer); Thread t4 = new Thread(consumer); t1.start(); t2.start(); t3.start(); t4.start(); 

2 Comments

Instead of synchronizing, would using ReentrantReadWriteLock work?
Yes. ReentrantRreadWriteLock is more powerful then synchronized methods/statements. Using synchronized(items) in Producers will make Producers wait for each other when putting items, thus putting them in turns. You can avoid such situation with Reentrant.
0

You need to synchronize the items access. I just slightly changed your example and the result looks good. Because of synchronization, you will also have to take care of dead locks. In this case it should be fine as long as you don't synchronize over the items.take() in the Consumer though.

Your new Test:

import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ProducerConsumer { public static void main(String args[]) { BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); Thread t1 = new Thread(new Producer(items)); Thread t2 = new Thread(new Producer(items)); Thread t3 = new Thread(new Consumer(items)); Thread t4 = new Thread(new Consumer(items)); t1.start(); t2.start(); t3.start(); t4.start(); } } 

The consumer

import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; public class Consumer implements Runnable { BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); public Consumer(BlockingQueue<Integer> q) { this.items = q; } public void run() { while (true) { try { System.out.println("Consuming " + items.take() + " " + Thread.currentThread().getName() + " size left " + items.size()); Thread.sleep(1000); } catch (InterruptedException ex) { Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex); } } } } 

And the producer

import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; public class Producer implements Runnable { BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); public Producer(BlockingQueue<Integer> q) { this.items = q; } private int generateRandomNumber(int start, int end) { Random rand = new Random(); int number = start + rand.nextInt(end - start + 1); return number; } public void run() { for (int i = 0; i < 5; i++) { int rand = generateRandomNumber(100, 1000); try { synchronized (items) { items.put(rand); System.out.println("Producing " + rand + " " + Thread.currentThread().getName() + " size left " + items.size()); } Thread.sleep(1000); } catch (InterruptedException ex) { Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex); } } } } 

6 Comments

What the point of this? Now Producers will have to wait each other to add elements to queue. And this will not make Consumers wait till you put element AND check queue size.
The first one is correct and should be that way. For the second part, you are right, that's an issue.
@Zvezdochet Do you have a good solution for the second issue?
I don't know the simple solution for this issue for BlockingQueue. Why do you need to know queue size (except for debugging purpose)? You can add 'monitoring' thread, which will print size of the queue every several seconds. You can also use explicit synchronization (like you tried), but in that case you will probably don't need BlockingQueue. Because whole point of BlockingQueue is to free clients of explicit synchronization and blocks between Consumer and Producers
@brimborium I think usage of ReentrantReadWriteLock might help in this situation.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.