I am using rxjava for maintenance tasks in the following way:
In a class where I need regular maintenance, I use the following static subscription, which results in the Observable being launched for the first time when the class is loaded into memory, and then in regular intervals as specified.
private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override public Observable<String> call(Long aLong) {
// some code
return Observable.just(null);
}
}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
.subscribe();
Now I have a situation, where I want to report the result of my maintenance to the UI.
Normally, I would use the following schema
Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
// some code
subscriber.onNext(result);
subscriber.onCompleted(); }
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override public void call(String result) {
// write to the UI
}
});
but here we have an Observable which is executed just once.
For an Observable which is executed in regular intervals, I couldn't find a way to call Action in Subscriber, so that I can pass the result using subscriber.onNext(). It looks like there is no suitable signature for the Observable, which can take the long from timer() and at the same time allows subscribe with action. But knowing rxjava I am sure there is a trick ;-)
I could make this work using zip to zip a Timer Observable and a one-time Observable (basically zipping the two versions together), but I would rather use the first structure because it behaves slightly different.
--
I tried to merge both versions into one in the following manner:
private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override public Observable<String> call(Long aLong) {
// some code // stays here to ensure there is no concurrency while executing
final String result = "result"; // I store the result in a final variable after some code has been finished
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(result); // then I use it in a new Observable and emit it
subscriber.onCompleted(); // not sure if this is needed here (haven't tested this yet)
}
});
}
}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread())
.subscribe(new Action1<String>() {
@Override public void call(String result) {
// so I can finally consume the result on the UI thread
}
});
Instead of creating and emitting a "null" Observable, I create one which allows me to send a result to the subscriber.
Pretty messy, but this should work, right? Any simpler solution? What are your thoughts?
Observable.timer.flatMapdoesn't work? - zsxwing