When I run the following code, only 2 out of 8 threads that available run, can anyone explain why is it the case? how can I change the code in such a way that it will take advantage of all 8 threads?
Tree.java:
package il.co.roy; import java.util.HashSet; import java.util.Objects; import java.util.Set; public class Tree<T> { private final T data; private final Set<Tree<T>> subTrees; public Tree(T data, Set<Tree<T>> subTrees) { this.data = data; this.subTrees = subTrees; } public Tree(T data) { this(data, new HashSet<>()); } public Tree() { this(null); } public T getData() { return data; } public Set<Tree<T>> getSubTrees() { return subTrees; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Tree<?> tree = (Tree<?>) o; return Objects.equals(data, tree.data) && Objects.equals(subTrees, tree.subTrees); } @Override public int hashCode() { return Objects.hash(data, subTrees); } @Override public String toString() { return "Tree{" + "data=" + data + ", subTrees=" + subTrees + '}'; } public void sendCommandAll() { if (data != null) System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } if (data != null) System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true); subTrees.parallelStream() // .map(Tree::sendCommandAll) .forEach(Tree::sendCommandAll); // .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2); } } (It doesn't matter if I use forEach or reduce).
Main.java:
package il.co.roy; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.IntStream; public class Main { public static void main(String... args) { System.out.println("Processors: " + Runtime.getRuntime().availableProcessors()); final Tree<Integer> root = new Tree<>(null, Set.of(new Tree<>(1, IntStream.range(2, 7) .boxed() .map(Tree::new) .collect(Collectors.toSet())))); root.sendCommandAll(); // IntStream.generate(() -> 1) // .parallel() // .forEach(i -> // { // System.out.println(Thread.currentThread().getName()); // try // { // Thread.sleep(5000); // } catch (InterruptedException e) // { // e.printStackTrace(); // } // }); } } In the main method I create a tree with the following structure:\
root (data is `null`) |- 1 |- 2 |- 3 |- 4 |- 5 |- 6 sendCommandAll function process every sub-tree (in parallel) only if it's parent finishes to be processed. but the result is as follows:
Processors: 8
[main] sending command to 1
[main] tree with data 1 got true
[main] sending command to 6
[ForkJoinPool.commonPool-worker-2] sending command to 5
[main] tree with data 6 got true
[ForkJoinPool.commonPool-worker-2] tree with data 5 got true
[ForkJoinPool.commonPool-worker-2] sending command to 4
[ForkJoinPool.commonPool-worker-2] tree with data 4 got true
[ForkJoinPool.commonPool-worker-2] sending command to 3
[ForkJoinPool.commonPool-worker-2] tree with data 3 got true
[ForkJoinPool.commonPool-worker-2] sending command to 2
[ForkJoinPool.commonPool-worker-2] tree with data 2 got true
(For the record, when I execute the commented code in Main.java, the JVM uses all 7 (+ 1) threads available commonPool)
How can I improve my code?
ForkJoinPool, with parallelism of 20, only 2 threads are activeHashMaps (and in turnHashSets) with a small number of elements, compared to their (default) capacity may distribute their work badly, depending on the hash code distribution. You can work around this usingnew ArrayList<>(subTrees).parallelStream()but there are other flaws in your approach, like doing the work/wait before even starting to traverse the children. You should separate the iteration logic from the actual action.