0
votes

I have a list of objects that I want retrieve from a local database (if available), or from a remote server otherwise. I'm using RxJava Observables (SqlBrite for the database and Retrofit for the remote server).

My query code is as follows:

Observable<List<MyObject>> dbObservable = mDatabase
            .createQuery(MyObject.TABLE_NAME,MyObject.SELECT_TYPE_A)
            .mapToList(MyObject.LOCAL_MAPPER);
Observable<List<MyObject>> remoteObservable = mRetrofitService.getMyObjectApiService().getMyObjects();

return Observable.concat(dbObservable, remoteObservable)
    .first(new Func1<List<MyObject>, Boolean>() {
                @Override
                public Boolean call(List<MyObject> myObjects) {
                    return !myObjects.isEmpty();
                }
            });

I see the first observable running and hitting the first method with an empty list, but then the retrofit observable does not run, there is no network request. If I switch the order of the observables, or just return the remote observable, it works as expected, it hits the remote server and returns the list of objects.

Why would the remote observable fail to run in this scenario? The subscriber's onNext, orError and onComplete methods are not called when I concatenate the observables with the db first and retrofit second.

Thanks!

2

2 Answers

5
votes

Kaushik Gopal has addressed this in his RxJava-Android-Samples github project.

He recommends using this technique:

getFreshNetworkData()
          .publish(network ->
                         Observable.merge(network,
                                          getCachedDiskData().takeUntil(network)))
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(new Subscriber<List<MyObject>() {

              ...
          });

In your case, it might look like this:

remoteObservable
      .publish(network ->
                     Observable.merge(network,
                                      dbObservable.takeUntil(network)))
      .first(myObjects -> !myObjects.isEmpty());

Edit: It sounds like you just might need this:

dbObservable
    .flatMap(localResult -> {
        if (localResult.isEmpty()) {
            return remoteObservable;
        } else {
            return Observable.just(localResult);
        }
    });
2
votes

I assume you have your observables which can get data from your local and remote like below:

        final Observable<Page> localResult = mSearchLocalDataSource.search(query);
        final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
                .doOnNext(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        if (page != null) {
                            mSearchLocalDataSource.save(query, page);
                            mResultCache.put(query, page);
                        }
                    }
                });

Then you can map them and get first which means if local available use local if not use remote:

        return Observable.concat(localResult, remoteResult)
                .first()
                .map(new Func1<Page, Page>() {
                    @Override
                    public Page call(Page page) {
                        if (page == null) {
                            throw new NoSuchElementException("No result found!");
                        }
                        return page;
                    }
                });

And subscribe it like below:

mCompositeSubscription.clear();
        final Subscription subscription = mSearchRepository.search(this.mQuery)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Page>() {
                    @Override
                    public void onCompleted() {
                        // Completed
                    }

                    @Override
                    public void onError(Throwable e) {
                        mView.onDefaultMessage(e.getMessage());
                    }

                    @Override
                    public void onNext(Page page) {
                        mView.onDefaultMessage(page.getContent());
                    }
                });

        mCompositeSubscription.add(subscription);

For more detail or example you can check my github repo: https://github.com/savepopulation/wikilight

Good luck!

Edit:

You can try a local observable like below. Simply it checks if there's a record and returns an empty observable.

@Override
public Observable<Page> search(@NonNull final String query) {
    return Observable.create(new Observable.OnSubscribe<Page>() {
        @Override
        public void call(Subscriber<? super Page> subscriber) {
            final Realm realm = Realm.getInstance(mRealmConfiguration);
            final Page page = realm.where(Page.class)
                    .equalTo("query", query)
                    .findFirst();
            if (page != null && page.isLoaded() && page.isValid()) {
                Log.i("data from", "realm");
                subscriber.onNext(realm.copyFromRealm(page));
            } else {
                Observable.empty();
            }
            subscriber.onCompleted();
            realm.close();
        }
    });
}

Edit 2:

When you return null from local concat and first will not work and your remote will not be called because null means observable returns null but still can observe. When you return observable.empty with concat and first this means observable cannot emit anything from local more and so it can emit from remote.