I have multiple modules that return Observables: O1, O2, O3... On
The result of all modules should be combined into one observable Ocomb
so that individual tasks can fail but the combination is not terminated or influenced by individual issues.
With my current solution I'm encountering various problems as in the following example:
This code combines the output of my modules:
public Observable<Data> getModuleData(){
List<Observable<Data>> tasks = new ArrayList<>();
for(MyModule module : modules){
tasks.add(module.getData());
}
return Observable
.mergeDelayError(Observable.from(tasks))
.onBackpressureBuffer(MAX_BUFFER)
.observeOn(AndroidSchedulers.mainThread());
}
Now, I want to display attribute X
e.g. 'name'
of all emitted data objects:
public List<String> getNames() {
return getModuleData()
.map(new Func1<Data, String>() {
@Override
public String call(Data data) {
return data.getName();
}
})
.timeout(600, TimeUnit.MILLISECONDS)
.toList()
.toBlocking()
.firstOrDefault(new ArrayList<String>());
}
The getNames()
method should return a list and therefore block the execution.
Problem 1
It seems there is an issue in RxJava that if I call observeOn()
and make it blocking it will not return no matter what timeout etc are saying.
Problem 2
If onObserve()
is removed, the code will work but in a different place of the app I'm rendering the results of the non-blocking observable in the UI. Data will be displayed but afterwards my UI does crazy stuff. I have to touch my UI list component to refresh the screen every time data changes.
Problem 3
Some of the modules might create internal errors or will not call onCompleted()
. I thought that a combination of mergeDelayError()
and timeout()
could handle these cases and call onCompleted()
for unresponsive modules. However, if one of the modules does not call onCompleted()
and the timeout()
statement is removed the blocking call will never return.
Questions:
What is the best way to combine multiple observable so that individual observables can fail but it's handled as onCompleted()
/ ignored and does not affect the combined observable?
What is the best solution to make the combined observable blocking and handle the timeout without stopping the execution or ending up in a loop?
mergeDelayError()
is the best shot. (there is an issue in combination of mergeDelayError() and observeOn() github.com/grails/grails-core/issues/10114). If you need the onCompleted, you can use onErrorReturn, but then also errors will be handled in onNext, onCompleted callbacks. – Tomas Bartalos