3

I have multiple modules that return Observables: O1, O2, O3... On The result of all modules should be combined into one observable Ocomb so that individual tasks can fail but the combination is not terminated or influenced by individual issues. With my current solution I'm encountering various problems as in the following example:

This code combines the output of my modules:

public Observable<Data> getModuleData(){ List<Observable<Data>> tasks = new ArrayList<>(); for(MyModule module : modules){ tasks.add(module.getData()); } return Observable .mergeDelayError(Observable.from(tasks)) .onBackpressureBuffer(MAX_BUFFER) .observeOn(AndroidSchedulers.mainThread()); } 

Now, I want to display attribute X e.g. 'name' of all emitted data objects:

public List<String> getNames() { return getModuleData() .map(new Func1<Data, String>() { @Override public String call(Data data) { return data.getName(); } }) .timeout(600, TimeUnit.MILLISECONDS) .toList() .toBlocking() .firstOrDefault(new ArrayList<String>()); } 

The getNames() method should return a list and therefore block the execution.

Problem 1 It seems there is an issue in RxJava that if I call observeOn() and make it blocking it will not return no matter what timeout etc are saying.

Problem 2 If onObserve() is removed, the code will work but in a different place of the app I'm rendering the results of the non-blocking observable in the UI. Data will be displayed but afterwards my UI does crazy stuff. I have to touch my UI list component to refresh the screen every time data changes.

Problem 3 Some of the modules might create internal errors or will not call onCompleted(). I thought that a combination of mergeDelayError() and timeout() could handle these cases and call onCompleted() for unresponsive modules. However, if one of the modules does not call onCompleted() and the timeout() statement is removed the blocking call will never return.

Questions:

What is the best way to combine multiple observable so that individual observables can fail but it's handled as onCompleted() / ignored and does not affect the combined observable?

What is the best solution to make the combined observable blocking and handle the timeout without stopping the execution or ending up in a loop?

3
  • A few questions. Why don't you simply return an Observable<List<String>> for getNames and subscribe to that (in short why are you blocking)? Why are you forcing a timeout? Commented May 18, 2016 at 10:40
  • I'm refactoring and wanted to keep the interface. The timeout was meant to terminate unresponsive observables. Commented May 18, 2016 at 11:03
  • If you need to have onNext from all the success observables and you don't care about onCompleted, I think mergeDelayError() is the best shot. (there is an issue in combination of mergeDelayError() and observeOn() github.com/grails/grails-core/issues/10114). If you need the onCompleted, you can use onErrorReturn, but then also errors will be handled in onNext, onCompleted callbacks. Commented Jan 9, 2017 at 16:29

1 Answer 1

3

What is the best way to combine multiple observable so that individual observables can fail but it's handled as onCompleted() / ignored and does not affect the combined observable?

 Observable.from(modules) .flatMap(MyModule::getData) .onErrorResumeNext(Observable.empty()) .timeout(600,TimeUnit.MILLISECONDS, Observable.empty()) .toList() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(System.out::println); 

Above stream could be converted to blocking by adding toBlocking before subscribe(), but does not make much sense beyond testing

Sign up to request clarification or add additional context in comments.

8 Comments

like the flatmap etc approach (had this before i encountered problems with it as well). however, beyond syntactic sugar it's not really adding or changing that much? module combination and consumtion are separated and it seems this would also cause problem 1. the onErrorResumeNext looks like a nice alternative to mergeDelayErrors that i will test now.
keep in mind that timeout is applied to stream of datas, if you want to apply it to every Observable.getData(), move timeout() inside flatMap function
thank you! commented before I saw the edit - I think this little change brings it very close to a solution
have you left out the backpressure statement on purpose?
The onErrorResumeNext is not an alternative to MergeDelayError. It's simply a way of handling errors.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.