13

I have the following example data set that I want to transform/reduce using the Java Stream API based on direction's value

Direction int[] IN 1, 2 OUT 3, 4 OUT 5, 6, 7 IN 8 IN 9 IN 10, 11 OUT 12, 13 IN 14 

to

Direction int[] IN 1, 2, OUT 3, 4, 5, 6, 7 IN 8, 9, 10, 11 OUT 12, 13 IN 14 

Code that I've written so far:

enum Direction { IN, OUT } class Tuple { Direction direction; int[] data; public Tuple merge(Tuple t) { return new Tuple(direction, concat(getData(), t.getData())); } } private static int[] concat(int[] first, int[] second) { int[] result = Arrays.copyOf(first, first.length + second.length); System.arraycopy(second, 0, result, first.length, second.length); return result; } List<Tuple> reduce = tupleStream.reduce(new ArrayList<>(), WDParser::add, WDParser::combine); private static List<Tuple> combine(List<Tuple> list1, List<Tuple> list2) { System.out.println("combine"); list1.addAll(list2); return list1; } private static List<Tuple> add(List<Tuple> list, Tuple t) { System.out.println("add"); if (list.size() == 0) { list.add(t); } else if (list.size() > 0) { int lastIndex = list.size() - 1; Tuple last = list.get(lastIndex); if (last.getDirection() == t.getDirection()) list.set(lastIndex, last.merge(t)); else list.add(t); } return list; } 

I believe there is a better and simpler alternative to achieving the same.

Online examples and blogs I've found for Java Stream API reduce/combine use the Integer::sum function only. I'm hoping to build this up for more complex case scenarios.

0

5 Answers 5

5

I think your solution is pretty nice already, especially as using a reduction enables parallelism easily compared to collecting into a shared outside container. But it's easier to use collect instead of reduce as Holger pointed out. Furthermore, the conditions in the accumulator can be simplified a bit, and you forgot to merge the last and first elements in the combiner:

List<Tuple> reduce = tupleStream.collect(ArrayList::new, WDParser::add, WDParser::combine); private static List<Tuple> combine(List<Tuple> list1, List<Tuple> list2) { if (!list2.isEmpty()) { add(list1, list2.remove(0)); // merge lists in the middle if necessary list1.addAll(list2); // add all the rest } return list1; } private static List<Tuple> add(List<Tuple> list, Tuple t) { int lastIndex = list.size() - 1; if (list.isEmpty() || list.get(lastIndex).getDirection() != t.getDirection()) { list.add(t); } else { list.set(lastIndex, list.get(lastIndex).merge(t)); } return list; } 

Instead of using indexes to access the first/last element you could even use LinkedList and the methods add/removeFirst/Last().

Sign up to request clarification or add additional context in comments.

6 Comments

This way of using reduce will break awfully when trying with parallel execution. The functions passed to reduce should never modify the incoming objects. This kind of operation should be implemented using collect. See Mutable Reduction
@Holger I thought about it earlier, but only found the issues now that you pointed it out again. Have changed it to a collect. Is that enough or do I have to create new lists in the accumulator and combiner, as well?
You don’t need to create new lists in the accumulator or combiner; that’s the whole point of collectors. The only thing that is missing, is that the combiner should be able to handle an empty list2 argument as well (this may happen when you collect after a filter).
Thanks, I'll check the solution later today and update.
@Amitoj it will only be called in parallel streams. Simply speaking, the stream will be split into chunks in that case. Each part will be collected using the "accumulator" (add). Afterwards, these collection results will be "combined" into the final result. For sequential stream, there is only 1 part (the whole stream) accumulated, hence there is no need to to use the combiner.
|
3

How about this. First define a small helper method:

private static Tuple mergeTwo(Tuple left, Tuple right) { int[] leftArray = left.getData(); int[] rightArray = right.getData(); int[] result = new int[leftArray.length + rightArray.length]; System.arraycopy(leftArray, 0, result, 0, leftArray.length); System.arraycopy(rightArray, 0, result, leftArray.length, rightArray.length); return new Tuple(left.getDirection(), result); } 

This is close to your concat/merge I guess, but a single one. Basically a way to merge two Tuple(s) together.

And a helper method to produce the needed Collector, you can put this into a utility so that it can be re-used:

private static Collector<Tuple, ?, List<Tuple>> mergedTuplesCollector() { class Acc { ArrayDeque<Tuple> deque = new ArrayDeque<>(); void add(Tuple elem) { Tuple head = deque.peek(); if (head == null || head.getDirection() != elem.getDirection()) { deque.offerFirst(elem); } else { deque.offerFirst(mergeTwo(deque.poll(), elem)); } } Acc merge(Acc right) { Tuple lastLeft = deque.peekLast(); Tuple firstRight = right.deque.peekFirst(); if (lastLeft.getDirection() == firstRight.getDirection()) { deque.offerLast(mergeTwo(deque.pollLast(), right.deque.pollFirst())); } else { deque.addAll(right.deque); } return this; } public List<Tuple> finisher() { return new ArrayList<>(deque); } } return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher); } 

And usage would be, for example:

List<Tuple> merged = tuples.stream() .parallel() .collect(mergedTuplesCollector()); 

Comments

1

This is an alternative approach that uses slightly different data structures.

If this is an option, changing from int[] to List<Integer> allows for more flexibility (not to mention avoiding creating/copying arrays multiple times):

class Tuple { Direction direction; List<Integer> data; } 

And the following function does the merging on a Deque collection:

private static List<Integer> next(Deque<Tuple> t, Direction d) { if (!t.isEmpty() && t.peekLast().getDirection() == d) { return t.peekLast().getData(); } else { Tuple next = new Tuple(); next.direction = d; next.data = new ArrayList<>(); t.addLast(next); return next.data; } } 

And with that, the stream can look as simple as:

Deque<Tuple> deq = new LinkedList<>(); //the final collection of tuples tuples.stream() .flatMap(tp -> tp.getData().stream() .map(d -> Pair.of(tp.getDirection(), Integer.valueOf(d)))) .forEach(el -> next(deq, el.getLeft()).add(el.getRight())); 

5 Comments

I think it's safer to use forEachRemaining instead of forEach here. The docs say that forEach is not deterministic about the order in which the elements is processed, and I as far as I see this could lead to inconsistencies with your approach.
@MalteHartwig I think you meant forEachOrdered :)
@FedericoPeraltaSchaffner hahaha, yes, confused it with Spliterator
Thanks, I'm using int[] because it's easier to convert to & from Netty ByteBuf object.
Interesting use of stream flatMap, I'll keep this for future reference.
0

I've got two ideas on this topic. First one is getting the indices like in this answer and group it accordingly.

The second idea - if you already got a Stream a custom Collector should be used (similar to the other solutions, though using Deque):

private Collector<Tuple, ?, List<Tuple>> squashTuples() { return new Collector<Tuple, Deque<Tuple>, List<Tuple>>() { @Override public Supplier<Deque<Tuple>> supplier() { return ArrayDeque::new; } @Override public BiConsumer<Deque<Tuple>, Tuple> accumulator() { return (acc, e) -> { Objects.requireNonNull(e); if (!acc.isEmpty() && acc.peekLast().getDirection() == e.getDirection()) { acc.offerLast(acc.pollLast().merge(e)); } else { acc.offerLast(e); } }; } @Override public BinaryOperator<Deque<Tuple>> combiner() { return (left, right) -> { if (!left.isEmpty() && !right.isEmpty() && left.peekLast().getDirection() == right.peekFirst().getDirection()) { left.offerLast(left.pollLast().merge(right.pollFirst())); } left.addAll(right); return left; }; } @Override public Function<Deque<Tuple>, List<Tuple>> finisher() { return ArrayList::new; } @Override public Set<Characteristics> characteristics() { return EnumSet.noneOf(Characteristics.class); } }; } 

3 Comments

I don't think that a combiner needs checks for isEmpty in this case
@Eugene I think you're basically right. There should not be a merging step with an empty container, but are there any guarantees? I cannot see any in the documentation.
well considering that a accumulator will always be called before a combiner, it seems that this is always so; but I am just wondering if this is documented a such... I'll try to find this
0

Here's a solution that uses the groupRuns method of the StreamEx library to group the stream into runs of elements with the same direction value. It then merges those runs of elements together into a single new Tuple.

List<Tuple> result = StreamEx.of(list) .groupRuns((t1, t2) -> t1.getDirection() == t2.getDirection()) .map(ts -> new Tuple( ts.getFirst().getDirection(), ts.stream().flatMapToInt(t -> Arrays.stream(t.getData())).toArray())) .toList(); 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.