4

This answer provides an implementation for partioning an IntStream:

IntStream intStream = IntStream.iterate(0, i -> i + 1).limit(1000000); Predicate<Integer> p = x -> r.nextBoolean(); Map<Boolean, List<Integer>> groups = intStream.collect(() -> { Map<Boolean, List<Integer>> map = new HashMap<>(); map.put(false, new ArrayList<>()); map.put(true, new ArrayList<>()); return map; }, (map, x) -> { boolean partition = p.test(x); List<Integer> list = map.get(partition); list.add(x); }, (map1, map2) -> { map1.get(false).addAll(map2.get(false)); map1.get(true).addAll(map2.get(true)); }); System.out.println(groups.get(false).size()); System.out.println(groups.get(true).size()); 

But its edit mentions that this implementation is not thread safe. As far as I can see however, the collector creates a separate HashMap<List<Integer>> for each thread in a parallel stream. So each map is confined to a single thread. The partitioning function is confined to a single thread as well. The merging function merges the results from several threads, but as far as I know the stream framework makes sure merging is done in a thread safe manner. So my question: is this solution really not thread safe?

BTW: The answer provides a more elegant solution anyway (Stream<Integer> stream = intStream.boxed(); etc), but I still like to know.

PS: I would have like to add this question as a comment to the original post, but I don't even have the reputation to add comments... :|

1
  • 1
    The linked answer has been corrected. Commented Apr 16, 2020 at 16:11

2 Answers 2

4

According to Oracles documentation

Like reduce(int, IntBinaryOperator), collect operations can be parallelized without requiring additional synchronization.

https://docs.oracle.com/javase/8/docs/api/java/util/stream/IntStream.html#collect-java.util.function.Supplier-java.util.function.ObjIntConsumer-java.util.function.BiConsumer-

So it seems like your intuition is right that this is thread-safe.

Sign up to request clarification or add additional context in comments.

2 Comments

Surely they don't mean that the whole block is synchronized, that would kill concurrency. They probably mean it is thread safe from the perspective of the stream. The list.add(x) call is not thread safe. If it can be called concurrently there is a problem!
@ewramner it’s not synchronized, but each worker thread will have its own ArrayList or, in this specific example, its own two ArrayLists. See the Mutable Reduction section of the documentation. So the program logic ensures that list.add(x) is never invoked concurrently on the same list. See also the difference between concurrent and non-concurrent collectors.
0

Well, Random is thread safe but should not be used in this way as it has poor performance under contention, but ArrayList is not thread safe. With a parallel stream you will call list.add(x) from multiple threads.

1 Comment

Thanks for your response. My point is that each worker thread in a parallel stream creates its own map with accompanying array lists. (That's why you provide a supplier instead of a map itself.) So all these maps and lists are confined to their own worker thread, which makes this solution thread safe.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.