1

I'm writing some code that will be reading log lines and doing some processing in the background on that data. This processing would probably benifit from parallelization, such as what is offered by the Stream.parallel methods, and I was attempting to use this. This is the code I started out with that works perfectly.

public static void main(String[] args) { try { final Socket socket = new Socket(ADDRESS, PORT); final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); socket.getOutputStream().write(QUERY); reader.lines().forEach(System.out::println); } catch (IOException e) { e.printStackTrace(); } } 

This code connects and prints out all of my data. I would very much like to restructure this code as follows:

public static void main(String[] args) { try (Socket socket = new Socket(ADDRESS, PORT); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { socket.getOutputStream().write(QUERY); reader.lines().forEach(System.out::println); } catch (IOException e) { e.printStackTrace(); } } 

But sadly this doesn't work. Even worse, going back to the original code, this doesn't even work:

public static void main(String[] args) { try { final Socket socket = new Socket(ADDRESS, PORT); final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); socket.getOutputStream().write(QUERY); reader.lines().parallel().forEach(System.out::println); } catch (IOException e) { e.printStackTrace(); } } 

All that was added here was a .parallel call and this completely doesn't work. It just sits there and nothing is printed out.

I can live perfectly well and good without the 2nd version using the modified try(A a = new A()) {} as that doesn't look too good in this case. What I can't live without is figuring out why this .parallel call breaks everything.

I'm assuming the modified try statement is closing the streams as soon as I fall out of them (right after we start the forEach) so they are being killed and GC'd before operation. I can't for the life of me figure out what the hell is going on with the .parallel call.

As requested here is the output of jstack running on the .parellel() version of this code.

Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode): "Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007fd4f4001000 nid=0x4907 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007fd5280be000 nid=0x48d2 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007fd5280bb000 nid=0x48d1 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007fd5280b9800 nid=0x48d0 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007fd5280b6800 nid=0x48cf waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007fd5280b5000 nid=0x48ce runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007fd528082000 nid=0x48cd in Object.wait() [0x00007fd515c6d000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) - locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209) "Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007fd52807d800 nid=0x48cc in Object.wait() [0x00007fd515d6e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock) at java.lang.Object.wait(Object.java:502) at java.lang.ref.Reference.tryHandlePending(Reference.java:191) - locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153) "main" #1 prio=5 os_prio=0 tid=0x00007fd528008000 nid=0x48c2 runnable [0x00007fd52fd9f000] java.lang.Thread.State: RUNNABLE at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) - locked <0x00000000ec086790> (a java.net.SocksSocketImpl) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at java.net.Socket.<init>(Socket.java:434) at java.net.Socket.<init>(Socket.java:211) at com.gravypod.Test.main(Test.java:48) "VM Thread" os_prio=0 tid=0x00007fd528075800 nid=0x48ca runnable "GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007fd52801d800 nid=0x48c4 runnable "GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007fd52801f000 nid=0x48c5 runnable "GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007fd528021000 nid=0x48c6 runnable "GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007fd528022800 nid=0x48c7 runnable "VM Periodic Task Thread" os_prio=0 tid=0x00007fd5280c0800 nid=0x48d3 waiting on condition JNI global references: 18 

The Test.java:48 line is the Socket socket = new Socket line. This is the result of the fully-working non-parallel code (just using .lines()).

Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode): "Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007f9048001000 nid=0x4982 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007f90800be800 nid=0x496f runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007f90800bb000 nid=0x496e waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f90800b9800 nid=0x496d waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f90800b6800 nid=0x496c waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f90800b5000 nid=0x496b runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f9080082000 nid=0x496a in Object.wait() [0x00007f907018d000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) - locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209) "Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f908007d800 nid=0x4969 in Object.wait() [0x00007f907028e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock) at java.lang.Object.wait(Object.java:502) at java.lang.ref.Reference.tryHandlePending(Reference.java:191) - locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153) "main" #1 prio=5 os_prio=0 tid=0x00007f9080008000 nid=0x4961 runnable [0x00007f90884c3000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:170) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) - locked <0x00000000ec08e890> (a java.io.InputStreamReader) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) - locked <0x00000000ec08e890> (a java.io.InputStreamReader) at java.io.BufferedReader.readLine(BufferedReader.java:389) at java.io.BufferedReader$1.hasNext(BufferedReader.java:571) at java.util.Iterator.forEachRemaining(Iterator.java:115) at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) at com.gravypod.Test.main(Test.java:51) "VM Thread" os_prio=0 tid=0x00007f9080075800 nid=0x4968 runnable "GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f908001d800 nid=0x4963 runnable "GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f908001f000 nid=0x4964 runnable "GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f9080021000 nid=0x4965 runnable "GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f9080022800 nid=0x4966 runnable "VM Periodic Task Thread" os_prio=0 tid=0x00007f90800c1000 nid=0x4970 waiting on condition JNI global references: 319 

The line Test.java:51 is the reader.lines().forEach line.

11
  • 1
    What are the relative speeds of (a) reading all the the data from the socket and (b) doing all the processing on the various "lines" you read from the socket? If (a) longer than or at least not much short than (b) you can't effectively parallelize this since (a) is inherently serial and will be the the bottleneck. If (b) is long you can do it, but just break it into two steps - the reading of all the lines (single-threaded), and the parallel processing of the lines. Then you can overlap them a bit to exact a bit more performance. Commented Jan 3, 2017 at 16:40
  • 1
    To diagnose your deadlock issue, take a jstack of the process when it hangs and post it. It is perhaps also easier to replace the socket+buffered reader with an StringReader and see if the problem still exists, since it would allow you to post an MVCE. Commented Jan 3, 2017 at 16:42
  • 1
    So what's the answer? How long does the reading (typically) take? How long does the subsequent parsing (typically) take? I can't determine that based on the description since the network could reasonably vary between 500 Kbps and 5,000,000 Kpbs, and the processing could vary in a similar range. Commented Jan 3, 2017 at 16:46
  • 1
    Also, is the example that's hanging the actual example above with System.out::println or is it something where the method in forEach() is much more complicated? If the latter, we don't really have a hope of diagnosing it. Commented Jan 3, 2017 at 16:47
  • 2
    I have the feeling you are not showing the complete picture here. The parallel() version blocks on the socket connect, that could be easily explaind by the server only supports a single connection and you have the other version still running. If you have multiple parallel() versions in the same jvm it will fill up the ForkJoinThreadPool and eventually block. Can you share the >30 other lines in your class. Commented Jan 3, 2017 at 19:41

2 Answers 2

1

I imagine that parallel() or the forEach() on a parallel stream waits to read all the input before parallelising the task. Because the server never closes the connection, it will wait forever.

Your task is not really parallelizable. Data comes sequentially over the wire, so reading it in parallel cannot work.

Sign up to request clarification or add additional context in comments.

8 Comments

That is not the case here as .lines does not block to read all input. In it's doc string it also says Returns a Stream, the elements of which are lines read from this BufferedReader. The Stream is lazily populated, i.e., read only occurs during the terminal stream operation.
It's the forEach that blocks. It will wait for all threads to finish, but some of them will never finish since the socket is never closed. Try closing the socket on the server, and you'll see the result.
@Lucian - in that case it would block in the non-parallel case too - if the underlying InputStream never terminates (and blocks instead), lines() will aways ultimately block, parallel or not.
You read the lines sequentially, like in your first example. You submit them to a ThreadPool to be executed (i.e. you create a Runnable that does something with a line). Depending on how many threads you have in your ThreadPool, they will be handled in parallel.
@BeeOnRope: that’s acknowledged by this comment. Though, there is a technical difference between “blocking (forever)” and “waiting very long (until the OP gives up)”. Since the stream source is inherently sequential, the underlying implementation will try to buffer elements to enable parallel processing and the number it tries to buffer is insanely large. See Reader#lines() parallelizes badly due to nonconfigurable batch size…
|
0

It seems, your application isn’t hanging, technically, but just waiting for a lot of input, before performing observable work. This is a combination of two implementation details. When you are starting a parallel stream operation, it will first try to split the workload until every CPU core has something to do, before actually starting to process elements. This combines with the Reader#lines() parallelizes badly due to nonconfigurable batch size issue.

Simply said, when a Stream has an unknown size, the implementation will try to buffer batches of sizes that are multiples of 1024, growing on each split. This great answer shows, how the splitting will happen for a stream of an unknown size with multiple cores, showing that multiples of 1024 elements will get buffered in the process. This can take a very long time, before the consumer passed to forEach is ever invoked.

Note that processing an infinite source via the non-short-circuiting forEach is outside the scope of the Stream API anyway. Assuming a timely side effect is an assumption about the processing order of the Stream, but there is no guaranty about it.

This answer guides you to a work-around. You can use something like

try(Socket socket = new Socket(ADDRESS, PORT); BufferedReader reader = new BufferedReader( new InputStreamReader(socket.getInputStream()))) { socket.getOutputStream().write(QUERY); Stream.generate(() -> { try { return reader.readLine(); } catch (IOException ex) { throw new UncheckedIOException(ex); } }).parallel().forEach(System.out::println); } catch(IOException|UncheckedIOException e) { e.printStackTrace(); } 

But, as said, this isn’t the intended use case of the Stream API…

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.