I used RxJava in android with Retrofit 2 and I have invoked subscribeOn(Schedulers.io()) android observeOn(AndroidSchedulers.mainThread()) global before subscribe(). However, sometime I would like to call subscribeOn(Schedulers.immediate()) android observeOn(Schedulers.immediate()) to override the Schedulers set before to get synchronized process. But I found it doesn't work, android works would be still processed on io() thread, android result processed by mainThread(). Why?
1 Answers
That's just the way RxJava works.
Take a look at this video tutorial, starting at the 12:50 mark. So given the example in the video:
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
What happens is that subscribeOn()
nests all calls. In this case subscribeOn(Schedulers.io())
is spawned first and subscribes everything above it on the io thread. But then subscribeOn(Schedulers.newThread())
is spawned next and it takes priority (since it was called last) to subscribe everything on it instead. There is no building a chain of threads. In this example, you are essentially spawning the io thread for no good reason.
To better handle the subscribeOn()
and observeOn()
methods, I suggest you take a look at this post from the same author of the video. What he is proposing is to use a Transformer
to wrap the call to these methods:
Transformer
is actually justFunc1<Observable<T>, Observable<R>>
. In other words: feed it anObservable
of one type and it'll return anObservable
of another. That's exactly the same as calling a series of operators inline.
This way, you can have a method like so:
<T> Transformer<T, T> applySchedulers() {
return observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
Or, if you want to reuse your transformers, you can have the following setup:
final Transformer schedulersTransformer =
observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
@SuppressWarnings("unchecked")
<T> Transformer<T, T> applySchedulers() {
return (Transformer<T, T>) schedulersTransformer;
}
Then the above example would look like:
Observable.just(1, 2, 3)
.compose(applySchedulers())
.subscribe(System.out::println);
Hope that helps.