7

When I run the following code, only 2 out of 8 threads that available run, can anyone explain why is it the case? how can I change the code in such a way that it will take advantage of all 8 threads?

Tree.java:

package il.co.roy; import java.util.HashSet; import java.util.Objects; import java.util.Set; public class Tree<T> { private final T data; private final Set<Tree<T>> subTrees; public Tree(T data, Set<Tree<T>> subTrees) { this.data = data; this.subTrees = subTrees; } public Tree(T data) { this(data, new HashSet<>()); } public Tree() { this(null); } public T getData() { return data; } public Set<Tree<T>> getSubTrees() { return subTrees; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Tree<?> tree = (Tree<?>) o; return Objects.equals(data, tree.data) && Objects.equals(subTrees, tree.subTrees); } @Override public int hashCode() { return Objects.hash(data, subTrees); } @Override public String toString() { return "Tree{" + "data=" + data + ", subTrees=" + subTrees + '}'; } public void sendCommandAll() { if (data != null) System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } if (data != null) System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true); subTrees.parallelStream() // .map(Tree::sendCommandAll) .forEach(Tree::sendCommandAll); // .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2); } } 

(It doesn't matter if I use forEach or reduce).

Main.java:

package il.co.roy; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.IntStream; public class Main { public static void main(String... args) { System.out.println("Processors: " + Runtime.getRuntime().availableProcessors()); final Tree<Integer> root = new Tree<>(null, Set.of(new Tree<>(1, IntStream.range(2, 7) .boxed() .map(Tree::new) .collect(Collectors.toSet())))); root.sendCommandAll(); // IntStream.generate(() -> 1) // .parallel() // .forEach(i -> // { // System.out.println(Thread.currentThread().getName()); // try // { // Thread.sleep(5000); // } catch (InterruptedException e) // { // e.printStackTrace(); // } // }); } } 

In the main method I create a tree with the following structure:\

root (data is `null`) |- 1 |- 2 |- 3 |- 4 |- 5 |- 6 

sendCommandAll function process every sub-tree (in parallel) only if it's parent finishes to be processed. but the result is as follows:

Processors: 8
[main] sending command to 1
[main] tree with data 1 got true
[main] sending command to 6
[ForkJoinPool.commonPool-worker-2] sending command to 5
[main] tree with data 6 got true
[ForkJoinPool.commonPool-worker-2] tree with data 5 got true
[ForkJoinPool.commonPool-worker-2] sending command to 4
[ForkJoinPool.commonPool-worker-2] tree with data 4 got true
[ForkJoinPool.commonPool-worker-2] sending command to 3
[ForkJoinPool.commonPool-worker-2] tree with data 3 got true
[ForkJoinPool.commonPool-worker-2] sending command to 2
[ForkJoinPool.commonPool-worker-2] tree with data 2 got true

(For the record, when I execute the commented code in Main.java, the JVM uses all 7 (+ 1) threads available commonPool)

How can I improve my code?

5
  • Check this question: Parallel stream from a hashset doesn't run in parallel Commented Oct 13, 2021 at 12:32
  • I don't think I encountered the same problem: first of all I use JDK 17, even when I use custom ForkJoinPool, with parallelism of 20, only 2 threads are active Commented Oct 13, 2021 at 13:33
  • As the answer says - it's not guaranteed to work. Also, if I rewrite your code to use List instead of Set I see more threads used from the pool. Commented Oct 13, 2021 at 14:05
  • 3
    As explained in the second half of this answer, HashMaps (and in turn HashSets) with a small number of elements, compared to their (default) capacity may distribute their work badly, depending on the hash code distribution. You can work around this using new ArrayList<>(subTrees).parallelStream() but there are other flaws in your approach, like doing the work/wait before even starting to traverse the children. You should separate the iteration logic from the actual action. Commented Oct 13, 2021 at 14:10
  • Thank you @Holger, it indeed solved my problem, can you rewrite your comment as official answer for getting the credit and the points :-) Commented Oct 13, 2021 at 14:55

1 Answer 1

4

As explained in the second half of this answer, the thread utilization when processing HashMaps or HashSets depends on the distribution of the elements within the backing array, which depends on the hashcodes. Especially with with a small number of elements, compared to the (default) capacity, this may result in bad work splitting.

A simple work-around is using new ArrayList<>(subTrees).parallelStream() instead of subTrees.parallelStream().

But note that your method performs the actual work of the current node (in the example simulated with a sleep) before processing the children which also reduces the potential parallelism.

You may use

public void sendCommandAll() { if(subTrees.isEmpty()) { actualSendCommand(); return; } List<Tree<T>> tmp = new ArrayList<>(subTrees.size() + 1); tmp.addAll(subTrees); tmp.add(this); tmp.parallelStream().forEach(t -> { if(t != this) t.sendCommandAll(); else t.actualSendCommand(); }); } private void actualSendCommand() { if (data != null) System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } if (data != null) System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true); } 

This allows to process the current node concurrently to the processing of the children.

Sign up to request clarification or add additional context in comments.

11 Comments

Then, the simple solution new ArrayList<>(subTrees).parallelStream() will do. I’ll keep the other solution for future readers who might not have this constraint…
Did you read the linked answer? The challenge is to split the work without spending too much time in analyzing the situation. I debugged your case and the elements were indeed all clustered in one region of the array, so splitting the array into equal sized ranges leads to several all-empty ranges. The TreeMap shouldn’t have this issue as the nodes are already (almost) balanced, which allows recursively passing one half to another worker thread, however, it’s not perfectly balanced which may still lead to lesser parallelism for small sets.
@RoyAsh It's not a bug, and it's not specific to hash-based collections. Parallel processing is a tradeoff; splitting and joining has costs, which we hope to overcome by throwing more CPUs at the problem. In general, splitting down to one element is not optimal (and, using parallelism on small data sets is not optimal either.) The splitting heuristics are tuned for effective parallelism on large data sets with largely CPU-bound computations.
@RoyAsh Try LinkedList, it's even worse. It's a complicated function of the splitting heuristics and the collection topology and the quality of the Spliterator implementation.
@RoyAsh ArrayList knows that all elements are stored in its backing array from index zero to four. It’s easy to distribute them to workers. But HashSet has a backing array of length sixteen by default and the five elements are stored somewhere in that array, depending on their hash codes. Then, it gets the task “select (roughly) half of them for another worker” without actually iterating. Mind that the algorithm must be suitable to array lengths of 100 millions too. So it takes half of that array, hoping to catch closely half of the elements. Then, each worker repeats this recursively.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.