I have three Integer observers like below :
First Observer :
private Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(LOG_TAG, "onNext First " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
Second Observer :
private Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(LOG_TAG, "onNext Second " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
ThirdObserver :
private Observer<Integer> getThirdObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(LOG_TAG, "onNext Third " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
Now if I fun the following code :
void asyncSubjectDemo1() {
Observable<Integer> observable = Observable.just(1, 2, 3, 4);
PublishSubject<Integer> asyncSubject = PublishSubject.create();
observable.subscribe(asyncSubject);
asyncSubject.subscribe(getFirstObserver());
asyncSubject.subscribe(getSecondObserver());
asyncSubject.subscribe(getThirdObserver());
}
Nothing is printed in the Logcat as expected from the documentation
PublishSubject emits to an observer only those items that are emitted by the source Observable(s) subsequent to the time of the subscription.
But if I add observeOn while creating the Observable like below and run it
void asyncSubjectDemo1() {
Observable<Integer> observable = Observable.just(1, 2, 3, 4).observeOn(AndroidSchedulers.mainThread());
PublishSubject<Integer> asyncSubject = PublishSubject.create();
observable.subscribe(asyncSubject);
asyncSubject.subscribe(getFirstObserver());
asyncSubject.subscribe(getSecondObserver());
asyncSubject.subscribe(getThirdObserver());
}
The following is the output
D/MY_LOG: onNext First 1
D/MY_LOG: onNext Second 1
D/MY_LOG: onNext Third 1
D/MY_LOG: onNext First 2
D/MY_LOG: onNext Second 2
D/MY_LOG: onNext Third 2
Why is there any ambiguity in such a case ?
PublishSubject
doesn't observe the values until the next event loop cycle, at which point it already has active subscribers. – TrogDor