1
votes

I have three Integer observers like below :

First Observer :

 private Observer<Integer> getFirstObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(LOG_TAG, "onNext First " + integer);

            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {

            }
        };
    }

Second Observer :

private Observer<Integer> getSecondObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(LOG_TAG, "onNext Second " + integer);

            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        };
    }

ThirdObserver :

    private Observer<Integer> getThirdObserver() {
    return new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(Integer integer) {
            Log.d(LOG_TAG, "onNext Third " + integer);

        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    };
}

Now if I fun the following code :

    void asyncSubjectDemo1() {
        Observable<Integer> observable = Observable.just(1, 2, 3, 4);
        PublishSubject<Integer> asyncSubject = PublishSubject.create();
        observable.subscribe(asyncSubject);
        asyncSubject.subscribe(getFirstObserver());
        asyncSubject.subscribe(getSecondObserver());
        asyncSubject.subscribe(getThirdObserver());

    }

Nothing is printed in the Logcat as expected from the documentation

PublishSubject emits to an observer only those items that are emitted by the source Observable(s) subsequent to the time of the subscription.

But if I add observeOn while creating the Observable like below and run it

void asyncSubjectDemo1() {
            Observable<Integer> observable = Observable.just(1, 2, 3, 4).observeOn(AndroidSchedulers.mainThread());
            PublishSubject<Integer> asyncSubject = PublishSubject.create();
            observable.subscribe(asyncSubject);
            asyncSubject.subscribe(getFirstObserver());
            asyncSubject.subscribe(getSecondObserver());
            asyncSubject.subscribe(getThirdObserver());

        }

The following is the output

D/MY_LOG: onNext First 1
D/MY_LOG: onNext Second 1
D/MY_LOG: onNext Third 1
D/MY_LOG: onNext First 2
D/MY_LOG: onNext Second 2
D/MY_LOG: onNext Third 2

Why is there any ambiguity in such a case ?

1
From what I understand, Android's main thread runs an event loop. My guess is, you're building the Rx pipeline in one iteration of the loop. And, in the latter example, the PublishSubject doesn't observe the values until the next event loop cycle, at which point it already has active subscribers.TrogDor

1 Answers

1
votes

Please read the Javadoc of PublishSubject: http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/subjects/PublishSubject.html

"a PublishSubject doesn't retain/cache items, therefore, a new Observer won't receive any past items."

In the first case, you subscribe a PublishSubject to a synchronous source and thus right at the time, all items go through before the execution even gets to asyncSubject.subscribe(getFirstObserver());

In the second case, the source is now scheduled and when you subscribe a PublishSubject to it, you create a window or race (depending on where the method executes) so the asyncSubject.subscribe(getFirstObserver()); etc. have a chance to subscribe to the subject in time and thus receive the items later on.