1
votes

I have a particular observable chain that hangs on me. After some research I found out that replacing subscribe(subscriber) with a variant with onNext fixes the problem. The only difference between the two is that in the hanging variant my subscriber is wrapped into a SafeSubscriber which unsubscribes in onCompleted. Nonetheless, I don't understand why it hangs. Can someone explain this to me or point me somewhere?

This code hangs:

Scheduler sch = Schedulers.from(Executors.newSingleThreadExecutor());

Observable
    .create(subscriber -> Observable.just(null).subscribe(subscriber))
    .flatMap(ignored -> Observable.just(null).observeOn(sch))
    .toBlocking()
    .first();

This doesn't:

Scheduler sch = Schedulers.from(Executors.newSingleThreadExecutor());

Observable
    .create(subscriber -> Observable.just(null).subscribe(value -> subscriber.onNext(value)))
    .flatMap(ignored -> Observable.just(null).observeOn(sch))
    .toBlocking()
    .first();

The construct may seem strange but please note that I tried to simplify it to the shortest possible example that reproduces the problem. In reality it's much more complicated.

1

1 Answers

1
votes

The problem is subscriber -> Observable.just(null).subscribe(subscriber). Never reuse a subscriber because it's stateful. The Subscriber passed to Observable.onSubscribe has already been subscribed. Don't call subscribe on it again. If you find you need to call subscribe in Observable.onSubscribe, you should consider using lift.