0
votes

I'm learning RxJava so please be gentle. I've watched the tutorials, done the reading, searched SO, however, I'm still having some problems transforming my AsyncTaskLoader. For some reason, I can't find a pattern of operators to achieve my task (although I think it's a common one). What I'm trying to do is the following: return an Observable my fragment could subscribe to. The observable should do the following on subscribe:

1) Fetch data from the local database by doing 2 queries, running some logic and returning results;
2) Fetching data from API;
3) Synchronising the new API data with the database;
4) Repeating step one and returning results;

So far I've transformed my db calls and my API calls to return observables. I'm trying to understand how I can emit the cold results and continue with the chain. I could probably keep the two operations separately, and use the same subscriber to subscribe to both? But I'm not sure how that would work if my new loader-replacement class returns an observable... Also I don't really need to process the results from the second observable - I just need for the first one to replay when the second one finished.

So far I have the following:

 public Observable<StuffFetchResult> getColdStuff() {
    return Observable.zip(mDataSource.listStuff(), mDataSource.listOtherStuff(),
            (stuff, moreStuff) -> {
                List<Stuff> mergedList = new ArrayList<>();
                // do some merging stuff
                return new StuffFetchResult(mergedList);
            }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
 } 

Assume I also have getHotStuff() that will do the API call and the synchronisation with the database, if that's the right approach, and return the same Observable. However, I'm stuck on the next step - how can I restart the first observable to replay once hotStuff has completed, without adding another subscriber?

EDIT:

I've made some progress and I think all I need now is to join it all up. I have my two methods:

1) getColdStuff() is pretty much as described above
2) getHotStuff() will do call to the API, synchronise with the database, and return an Observable. The idea was to call getColdStuff() again after getHotStuff() has finished in order to refresh the UI, so actual result returned from getHotStuff() can be ignored. All it needs to do is to trigger getColdStuff() once done.

I've tried the suggestion in the answer to and created the following:

BehaviorRelay<Observable<StuffFetchResult>> callSequence = BehaviorRelay.create();
Observable<StuffFetchResult> valueSequence = Observable.switchOnNextDelayError(callSequence.toSerialized());
valueSequence.subscribe(new Subscriber<StuffFetchResult>() {
    @Override
    public void onCompleted() {}

    @Override
    public void onError(Throwable e) {}

    @Override
    public void onNext(StuffFetchResult result) {
        // UI stuff
    }
});
callSequence.call(loader.getColdStuff());

I can subscribe to valueSequence here and use callSequence.call(loader.getColdStuff());, which will run the first method and produce results in onNext() of my subscription, which I can use for my UI. However, I'm not sure how to run getHotStuff() in parallel and also do a different action on it when it returns. Also getHotStuff() returns a different type of Observable so I can't really use the same callSequence?

EDIT 2

Using two subscribers, I can achieve the required behaviour I think. Not really sure if that's the right way to go about it though.

 loader.getHotStuff()
    .subscribeOn(Schedulers.io())
    .subscribe( new Subscriber<Object>() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(Object stuffWeDontCareAbout) {
            callSequence.call(loader.getColdStuff());
        }
    });
1

1 Answers

0
votes

if i understand your scenario correctly, you may want something like that -

BehaviorSubject<Observable<T> callSequence = BehaviorSubject.create();
Observable<T> valueSequence = Observable.swithOnNextDelayError(callSequence.toSerialized());

your subscriber will be listening to the valueSequence, and whenever you need to "restart", you will call this -

callSequence.onNext(call.cache()); // *call* is Observable<T>

(i leave the .subscribeOn/.observeOn configuration to you)