I have a particular observable chain that hangs on me. After some research I found out that replacing subscribe(subscriber) with a variant with onNext fixes the problem. The only difference between the two is that in the hanging variant my subscriber is wrapped into a SafeSubscriber which unsubscribes in onCompleted. Nonetheless, I don't understand why it hangs. Can someone explain this to me or point me somewhere?
This code hangs:
Scheduler sch = Schedulers.from(Executors.newSingleThreadExecutor());
Observable
.create(subscriber -> Observable.just(null).subscribe(subscriber))
.flatMap(ignored -> Observable.just(null).observeOn(sch))
.toBlocking()
.first();
This doesn't:
Scheduler sch = Schedulers.from(Executors.newSingleThreadExecutor());
Observable
.create(subscriber -> Observable.just(null).subscribe(value -> subscriber.onNext(value)))
.flatMap(ignored -> Observable.just(null).observeOn(sch))
.toBlocking()
.first();
The construct may seem strange but please note that I tried to simplify it to the shortest possible example that reproduces the problem. In reality it's much more complicated.