15
votes

I concat two observables to display data from cache firstly and after that start loading data from the network and show updated data.

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler),
    getContentFromNetwork.subscibeOn(networkScheduler)
).observeOn(AndroidSchedulers.mainThread())
 .subscribe(subscriber);

If there is no network connection the second observable fails immediately after OnSubscribe is called.

In case the second observable fails immediately, data from first observable is lost. The onNext method is never called in the subscriber.

I think, this might be due to the following code in the OperatorConcat.ConcatSubscriber

    @Override
    public void onNext(Observable<? extends T> t) {
        queue.add(nl.next(t));
        if (WIP_UPDATER.getAndIncrement(this) == 0) {
            subscribeNext();
        }
    }

    @Override
    public void onError(Throwable e) {
        child.onError(e);
        unsubscribe();
    }

Looks like after error is received it unsubscribes, and all pending onNext are lost.

What is the best way to solve my problem?

Update

Looks like I have found the solution, instead of setting observOn for concatenated observable I set observOn for each observable.

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
    getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
 .subscribe(subscriber);
3
Is order of items important for you?marwinXXII
Yes, it is important.Rostyslav Roshak

3 Answers

10
votes

Default behavior of observeOn is that onError events can jump in front of the queue, here is the quote from the docs:

Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly asynchronous.

Here is small test to illustrate the thing:

Scheduler newThreadScheduler = Schedulers.newThread();

Observable<Integer> stream = Observable.create(integerEmitter -> {
    integerEmitter.onNext(1);
    integerEmitter.onNext(2);
    integerEmitter.onNext(3);
    integerEmitter.onNext(4);
    integerEmitter.onNext(5);
    integerEmitter.onError(new RuntimeException());
}, Emitter.BackpressureMode.NONE);

TestSubscriber<Integer> subscriber = new TestSubscriber<>();
stream.subscribeOn(Schedulers.computation())
        .observeOn(newThreadScheduler).subscribe(subscriber);

subscriber.awaitTerminalEvent();

subscriber.assertValues(1, 2, 3, 4, 5);
subscriber.assertError(RuntimeException.class);

Normally consumer would expect the following sequence: 1 > 2 > 3 > 4 > 5 > Error. But using just observeOn may put error ahead and test will fail.

This behavior was implemented long time ago here https://github.com/ReactiveX/RxJava/issues/1680, check for the motivation why it was done like that. To avoid such behavior one can use overloaded observeOn with delayError parameter:

indicates if the onError notification may not cut ahead of onNext notification on the other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream

This is what you normally expect, so changing observeOn(newThreadScheduler) to observeOn(newThreadScheduler, true) will fix the test.

Then to the question of @Neil: why solution proposed by @Rostyslav is working? It is working, because there is no thread switch for the final sequence.

In the proposed solution final sequence is crafted from two sequences on the same thread: 1st sequence is data from cache, 2nd sequence is just error from network. They are crafted together on the same thread and after there is no thread switch - subscriber observes on the AndroidSchedulers.mainThread(). If you try to change final Scheduler to some other, it will fail again.

2
votes

Operators in RxJava are designed to short-circuit onError notifications in general. Because the observables being concatenated are asynchronous sources then you are experiencing the short-circuit. If you don't want the short-circuit then you could do a concat on materialized observables and then perform the processing you desire:

Observable.concat(
    getContentFromCache.materialize().subscribeOn(dbScheduler),
    getContentFromNetwork.materialize().subscribeOn(networkScheduler)
)

Another approach would be to use onErrorResumeNext:

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler),
    getContentFromNetwork.onErrorResumeNext(something)
        .subscibeOn(networkScheduler)
)
0
votes
This is the small Example for concat both Observable

// First Observable //   
 private Observable<Book> getAutobiography()
        {
            String [] booknames= new String[]{"book1","book2"};
            final ArrayList<Book> arrayList = new ArrayList<>();
            for(String name:booknames){
                Book book = new Book();
                book.setBooktype("Autobiograph");
                book.setBookname(name);
                arrayList.add(book);
            }

            return Observable.create(new ObservableOnSubscribe<Book>() {
                @Override
                public void subscribe(ObservableEmitter<Book> emitter) throws Exception {

                    for(Book book:arrayList)
                    {
                        emitter.onNext(book);

                    }
                    if (!emitter.isDisposed()) {
                        emitter.onComplete();
                    }
                }
            }).subscribeOn(Schedulers.io());
        }

// Second Observable 



    private Observable<Book> getProgrammingBooks()
        {
            String [] booknames= new String[]{"book3","book4"};
            final ArrayList<Book> arrayList = new ArrayList<>();
            for(String name:booknames){
                Book book = new Book();
                book.setBooktype("Programming");
                book.setBookname(name);
                arrayList.add(book);
            }

            return Observable.create(new ObservableOnSubscribe<Book>() {
                @Override
                public void subscribe(ObservableEmitter<Book> emitter) throws Exception {

                    for(Book book:arrayList)
                    {
                        emitter.onNext(book);

                    }
                    if (!emitter.isDisposed()) {
                        emitter.onComplete();
                    }
                }
            }).subscribeOn(Schedulers.io());
        }

// Concat them 

    Observable.concat(getProgrammingBooks(),getAutobiography()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Book>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Book book) {


            Log.d("bookname",""+book.getBookname());
            Log.d("booktype",""+book.getBooktype());
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
// it print both values //