100

I have a class that takes objects from a BlockingQueue and processes them by calling take() in a continuous loop. At some point I know that no more objects will be added to the queue. How do I interrupt the take() method so that it stops blocking?

Here's the class that processes the objects:

public class MyObjHandler implements Runnable { private final BlockingQueue<MyObj> queue; public class MyObjHandler(BlockingQueue queue) { this.queue = queue; } public void run() { try { while (true) { MyObj obj = queue.take(); // process obj here // ... } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } 

And here's the method that uses this class to process objects:

public void testHandler() { BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler // to stop waiting for more objects? } 

5 Answers 5

86

If interrupting the thread is not an option, another is to place a "marker" or "command" object on the queue that would be recognized as such by MyObjHandler and break out of the loop.

Sign up to request clarification or add additional context in comments.

4 Comments

This is also known as the 'Poison Pill Shutdown' approach and is discussed at length in "Java Concurrency in Practice", specifically on pp. 155-156.
would this approach work if there are multiple consumers?
Each terminating consumer must re-insert the terminating object into the queue.
@RajatAggarwal I believe it doesn't work in case you have multiple consumers. Instead, you can use the method poll(timeout, TimeUnit) to make your consumers wait for a certain time.
16
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); Thread thread = new Thread(handler); thread.start(); for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } thread.interrupt(); 

However, if you do this, the thread might be interrupted while there are still items in the queue, waiting to be processed. You might want to consider using poll instead of take, which will allow the processing thread to timeout and terminate when it has waited for a while with no new input.

3 Comments

Yes, it's a problem if the thread is interrupted while there are still items in the queue. To get around this, I added code to make sure the queue is empty before interrupting the thread: <code>while (queue.size()>0) Thread.currentThread().sleep(5000);</code>
@MCS - note for future visitors that your approach here is a hack and should not be reproduced in production code for three reasons. It's always preferred to find an actual way to hook the shutdown. It's never acceptable to use Thread.sleep() as an alternative to a proper hook. In other implementations, other threads may be placing things into the queue, and the while loop may never end.
Thankfully, there's no need to rely on such hacks because one just as easily process it after the interrupt if necessary. For example, an "exhaustive" take() implementation might look like: try { return take(); } catch (InterruptedException e) { E o = poll(); if (o == null) throw e; Thread.currentThread().interrupt(); return o; } However, there's no reason it needs to be implemented at this layer, and implementing it slightly higher up will lead to more efficient code (such as by avoiding the per-element InterruptedException and/or by using BlockingQueue.drainTo()).
15

Very late but Hope this helps other too as I faced the similar problem and used the poll approach suggested by erickson above with some minor changes,

class MyObjHandler implements Runnable { private final BlockingQueue<MyObj> queue; public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL public MyObjHandler(BlockingQueue queue) { this.queue = queue; Finished = false; } @Override public void run() { while (true) { try { MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS); if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return { // process obj here // ... } if(Finished && queue.isEmpty()) return; } catch (InterruptedException e) { return; } } } } public void testHandler() { BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjHandler handler = new MyObjHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler to stop waiting for more objects? handler.Finished = true; //THIS TELLS HIM //If you need you can wait for the termination otherwise remove join myThread.join(); } 

This solved both the problems

  1. Flagged the BlockingQueue so that it knows it has not to wait more for elements
  2. Did not interrupted in between so that processing blocks terminates only when all the items in queue are processed and there are no items remaining to be added

3 Comments

Make Finished variable volatile to guarantee visibility between threads. See stackoverflow.com/a/106787
If I'm not mistaken, the final element in the queue will not be processed. When you take the final element from the queue, finished is true and the queue is empty, so it will return before having handled that final element. Add a third condition if (Finished && queue.isEmpty() && obj == null)
@MattR thanks, correctly advised I will edit the answer posted
4

Interrupt the thread:

thread.interrupt() 

Comments

1

Or don't interrupt, its nasty.

 public class MyQueue<T> extends ArrayBlockingQueue<T> { private static final long serialVersionUID = 1L; private boolean done = false; public ParserQueue(int capacity) { super(capacity); } public void done() { done = true; } public boolean isDone() { return done; } /** * May return null if producer ends the production after consumer * has entered the element-await state. */ public T take() throws InterruptedException { T el; while ((el = super.poll()) == null && !done) { synchronized (this) { wait(); } } return el; } } 
  1. when producer puts object to the queue, call queue.notify(), if it ends, call queue.done()
  2. loop while (!queue.isDone() || !queue.isEmpty())
  3. test take() return value for null

4 Comments

I would say that previous solution was a cleaner and a simpler than this
The done flag is the same as a poison pill, its just administered differently :)
Cleaner? I doubt it. You don't know when the thread is interrupted exactly. Or let me ask, what is cleaner with that? Its less code, that's a fact.
You would probably want to make your done boolean volatile. If you intend to use the class from multiple threads.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.