0

I am relatively new to rxjava2 and am hitting a rock. My issue is to be able to split one stream into two streams based on a condition then concat them.

Example: I have a network request (Retrofit) returning a Single. This object MyResponse contains a list of Procedures obtained/executed by

.map(MyResponse::getProcedures) .flatmap(Observable::fromIterable) .flatmap(Procedure::execute) 

This part is working every procedure created from the JSON is properly called and executed, however, some of those procedures have dependencies eg: 1 procedure upload a file on a server, and another disable Wifi. It'is required that Wifi disabling happen after the other procedure otherwise it will prevent the upload to execute.

I have been playing with RxJS Marbles to find the proper operator, but all of them work for 1or2 streams to 1. In my case, it should be 1 to 2.

1 - solution would be to call an ordering method over the collection to properly sort them and then normally execute them.

2 - Is there a way to split on a specific condition the stream into 2 distinct streams (networks ops/hardware ops whatever)? Achieving something like that :

.split(Procedure::hasPriority) // return true/false .concat(splitTrue, splitFalse) then execute them 

Thanks

@Lukasz Thx, I tested your proposition with a simplified case:

 Observable.just(1, 2, 3, 4, 5, 6) .publish(f -> Observable.concat( f.filter(this::isEven).map(this::printEven), f.filter(this::isOdd).map(this::printOdd) ) ).subscribe(); 

It prints: 2 is even, 4 is even, 6 is even, 1 is odd, 3 is odd, 5 is odd which was kind of the expected behavior.

In my case ordering the procedure based on a simple flag would be to basic causing each procedure would be evaluated independently of the other. I will go with a reordering of the whole list and implement something taking into account dependecies between the procedures.

In the meantime, going back to books, rx is not that user friendly for newcomers. Thx again

2
  • Since you already have the full list, sorting the tasks is a much simpler approach. Commented Jan 22, 2018 at 15:03
  • @akarnokd You are right, It will provide more control and will allow to handle multiple level of dependencies between the differents procedures. Commented Jan 24, 2018 at 19:46

1 Answer 1

1

I think this can be done using the version of the publish operator which takes a function. What it does is, it lets you subscribe to the upstream observable as many times as you want, and perform different operations on it, as long as you ultimately return a single observable from that function.

So I think in your case you would do something like

.map(MyResponse::getProcedures) .flatmap(Observable::fromIterable) .publish(shared -> { Observable.concat( shared.filter(Prodedure::hasPriority).flatMap(Procedure::execute), shared.filter(!Procedrue::hasPriority).flatMap(Procedure::execute) ) }) 

Hope this helps

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.