I have an application that uses a ConnectableObservable that runs for a long time. Mysteriously after some time its observer stopped getting notifications in its onNext() method.
I have written the following test that simplifies the example. It's just a ConnectableObservable with an infinite loop, with one subscriber using both observeOn and subscribeon. After 128 s.onNext(1)
calls it stops notifying the observer.
@Test
public void testHotObservable() throws InterruptedException{
CountDownLatch latch = new CountDownLatch(1);
ConnectableObservable<Integer> observable = Observable.<Integer>create( (s) -> {
while(true){
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(1);
}
})
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.publish();
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onNext(Integer i) {
System.out.println("got "+i);
}
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
};
observable.subscribe(observer);
observable.connect();
latch.await();
}
This is what I've seen debugging RxJava's code I have found out the reason why it doesn't call the Observer's onNext() method but I don't understand it:
1.- s.onNext(1);
is called:
2.- The execution gets to rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber.pollQueue()
:
void pollQueue() {
int emitted = 0;
final AtomicLong localRequested = this.requested;
final AtomicLong localCounter = this.counter;
do {
localCounter.set(1);
long produced = 0;
long r = localRequested.get();
for (;;) {
...
System.out.println("R: "+r);
if (r > 0) {
Object o = queue.poll();
if (o != null) {
child.onNext(on.getValue(o));
r--;
The problem is the value of r. The first time it executes its value is always 128. After each call it decrements by 1 (r--
). This means that ConnectableObservable can only notify its observers 128 times when using both observeOn and subscribeOn. If I remove subscribeOn, r's value starts over each iteration and it works.
UPDATE:
I found a solution: the problem was caused by the order of the .observerOn().subscribeOn()
. If I reverse it to .subscribeOn().observeOn()
it works (I can see that the value of r
is always reset to 128).
Anyway I'd appreciate an explanation.
state
andsignal
volatile variables? – akarnokdsignal = null;
statement right after thes.onNext(signal);
... I doesn't make sense though. – codependent