2

The simplest example would be a stream of strings like this:

["3", "a", "b", "c", "1", "a", "2", "a", "b"] 

The ones that are numbers describe how many more elements should it's group contain.

Very important that the stream is continuous so we cant just wait for the next number to split the stream.

As far as I know there is no built in functionality for this in RXJava2

var flowable = Flowable.concat(Flowable.fromArray("3", "a", "b", "c", "1", "a", "2", "a", "b"), Flowable.never()); flowable/*Something here*/.blockingSubscribe(System.out::println); 

And the expected output would be:

[3, a, b, c] [1, a] [2, a, b] 

1 Answer 1

2

I've later found akarnokd's RxJava2Extensions package. Using that, I was able to construct this, which does what I want:

 var flowable = Flowable.concat(Flowable.fromArray("3", "a", "b", "c", "1", "a", "2", "a", "b"), Flowable.never()); flowable.compose(FlowableTransformers.bufferUntil(new Predicate<>() { private int remaining = 0; @Override public boolean test(String next) { if(next.chars().allMatch(Character::isDigit)) { remaining = Integer.parseInt(next); } return --remaining < 0; } })).blockingSubscribe(System.out::println); 
Sign up to request clarification or add additional context in comments.

1 Comment

A word of caution, the Flowable you have constructed is not reusable. To be reusable the remaining field needs to be by-subscription. The defer operator gives you what you need: Flowable.defer(() -> flowable.compose(...))

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.