0

I am new to Java concurrency, and I met a very strange problem: I read from a large file and used several worker threads to work on the input (some complicated string matching tasks). I used a LinkedBlockingQueue to transmit the data to the worker threads, and a volatile boolean flag in the worker class to respond to the signal when the end-of-file is reached.

However, I cannot get the worker thread to stop properly. The CPU usage by this program is almost zero in the end, but the program won't terminate normally.

The simplified code is below. I have removed the real code and replaced them with a simple word counter. But the effect is the same. The worker thread won't terminate after the whole file is processed, and the boolean flag is set to true in the main thread.

The class with main

public class MultiThreadTestEntry { private static String inputFileLocation = "someFile"; private static int numbOfThread = 3; public static void main(String[] args) { int i = 0; Worker[] workers = new Worker[numbOfThread]; Scanner input = GetIO.getTextInput(inputFileLocation); String temp = null; ExecutorService es = Executors.newFixedThreadPool(numbOfThread); LinkedBlockingQueue<String> dataQueue = new LinkedBlockingQueue<String>(1024); for(i = 0 ; i < numbOfThread ; i ++) { workers[i] = new Worker(dataQueue); workers[i].setIsDone(false); es.execute(workers[i]); } try { while(input.hasNext()) { temp = input.nextLine().trim(); dataQueue.put(temp); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } input.close(); for(i = 0 ; i < numbOfThread ; i ++) { workers[i].setIsDone(true); } es.shutdown(); try { es.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } 

The worker class

public class Worker implements Runnable { private LinkedBlockingQueue<String> dataQueue = null; private volatile boolean isDone = false; public Worker(LinkedBlockingQueue<String> dataQueue) { this.dataQueue = dataQueue; } @Override public void run() { String temp = null; long count = 0; System.out.println(Thread.currentThread().getName() + " running..."); try { while(!isDone || !dataQueue.isEmpty()) { temp = dataQueue.take(); count = temp.length() + count; if(count%1000 == 0) { System.out.println(Thread.currentThread().getName() + " : " + count); } } System.out.println("Final result: " + Thread.currentThread().getName() + " : " + count); } catch (InterruptedException e) { } } public void setIsDone(boolean isDone) { this.isDone = isDone; } } 

Any suggestions to why this happens?

Thank you very much.

2
  • Here's a crazy suggestion, just because I've been looking at the API just now. Try an AtomicBoolean instead and see what happens? Apparently they are concurrency-ready -- I didnt post this as an answer because I've honestly no idea what the outcome will be. docs.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/… Commented May 18, 2015 at 1:20
  • 4
    This happens because take() on an empty queue waits until an element is available. See the answers to this question for ways to accomplish what you're trying to do while still using take(). Commented May 18, 2015 at 1:28

1 Answer 1

0

As Dan Getz already said your worker take() waits until an element becomes available but the Queue may be empty.

In your code you check if the Queue is empty but nothing prevents the other Workers to read and remove an element from the element after the check.

If the Queue contains only one element and t1 and t2 are two Threads the following could happen:

 t2.isEmpty(); // -> false t1.isEmpty(); // -> false t2.take(); // now the queue is empty t1.take(); // wait forever 

in this case t1 would wait "forever".

You can avoid this by using pollinstead of take and check if the result is null

public void run() { String temp = null; long count = 0; System.out.println(Thread.currentThread().getName() + " running..."); try { while(!isDone || !dataQueue.isEmpty()) { temp = dataQueue.poll(2, TimeUnit.SECONDS); if (temp == null) // re-check if this was really the last element continue; count = temp.length() + count; if(count%1000 == 0) { System.out.println(Thread.currentThread().getName() + " : " + count); } } System.out.println("Final result: " + Thread.currentThread().getName() + " : " + count); } catch (InterruptedException e) { // here it is important to restore the interrupted flag! Thread.currentThread().interrupt(); } } 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.