3

I was looking at this link for lazy stream construction and tried using it for one of my cases.

My primary stream has some operations that are needed to be done on Stream.onClose().

In my custom logic, I use Iterator from Stream.iterator() for stream processing.

This worked fine consuming the actual Stream. But, when I used Stream.flatMap() for constructing a lazy stream, onClose function gets called when I start iterating, which in-turn creates problems for me.

I tried this in zulu-opendjk 1.8.0_222 and 13. I am facing this exception in both the environments.

enter image description here

You can reproduce the problem with the below code.

import java.util.*; import java.util.stream.Stream; import java.util.stream.StreamSupport; public class TestStreamIterator { public static void main(String args[]) { Stream<String> stream1 = getStream(); stream1.iterator().forEachRemaining(System.out::println); Stream<String> stream2 = Stream.of(1).flatMap(integer -> getStream()); stream2.iterator().forEachRemaining(System.out::println); } private static Stream<String> getStream() { List<String> values = Arrays.asList("a", "b", "c"); MyIterator iterator = new MyIterator(values); Stream<String> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL | Spliterator.IMMUTABLE), false).onClose(iterator::close); return stream; } private static class MyIterator implements Iterator<String>, AutoCloseable { private Iterator<String> iterator; public MyIterator(List<String> values) { iterator = values.iterator(); } @Override public boolean hasNext() { return iterator.hasNext(); } @Override public String next() { return iterator.next(); } @Override public void close() { throw new IllegalStateException("Should not come here"); } } } 

My understating is, when using flatMap; close method of Stream.of(1) only should be called. Not of the stream created inside flatMap function.

I was expecting the onClose function to be invoked only when the stream is closed. But, I am not sure where the stream is getting closed.

Any help on solving this case would also be helpful.

0

1 Answer 1

4

When you call flatMap(integer -> getStream()) here:

Stream<String> stream2 = Stream.of(1).flatMap(integer -> getStream()); stream2.iterator().forEachRemaining(System.out::println); 

You are calling this method:

Iterator

<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);

Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element. Each mapped stream is closed after its contents have been placed into this stream. (If a mapped stream is null an empty stream is used, instead.)

So as the documentation says, the mapped stream you pass into this method (from getStream() which is a Stream over a MyIterator) will be closed, which it is, then (as defined in onClose of that Stream) it calls MyIterator.close() which throws the exception.


Addressing your comment since you don't seem to follow:

Stream<String> stream2 = Stream.of(1).flatMap(integer -> getStream()); 

Creates a stream that will lazily map to the contents of a sub-stream when you read it. When that sub-stream is loaded into the main stream the sub-stream will be closed.

stream2.iterator().forEachRemaining(System.out::println); 

You read from the main stream, which maps to the sub-stream, which reads all the sub-stream then closes the sub-stream which then calls Stream.onClose() which calls MyIterator.close()

Sign up to request clarification or add additional context in comments.

6 Comments

In my actual case, I have a resource opened in that iterator. So, if its closed, I cant access data that i want. Also, mapped stream is closed after its contents have been placed into this stream, I haven't accessed the data from the Stream using iterator. @xtratic
@suraj1291993, it seems that what should be closed are the resources created by the stream then, not the stream itself.
@DidierL I agree with you. Resources opened internally should be closed internally. To ensure that, the primary stream I return will have handling on onClose to close them. If this snippet is executed Stream.of(1).flatMap(integer -> getStream()).forEach(System.out::println), you can see that stream.close will be called after stream is consumed. i.e., all the elements in the stream are iterated and processed.
@suraj1291993 I have tried to explain further in my answer.
@suraj1291993 Incorrect. Only running Stream.of(1).flatMap(integer -> getStream()).forEach(System.out::println) nothing is printed, since the sub-stream closing throws the Exception.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.