0
votes

I have an application that uses a ConnectableObservable that runs for a long time. Mysteriously after some time its observer stopped getting notifications in its onNext() method.

I have written the following test that simplifies the example. It's just a ConnectableObservable with an infinite loop, with one subscriber using both observeOn and subscribeon. After 128 s.onNext(1) calls it stops notifying the observer.

@Test
public void testHotObservable() throws InterruptedException{

    CountDownLatch latch = new CountDownLatch(1);

    ConnectableObservable<Integer> observable = Observable.<Integer>create( (s) -> {
        while(true){
            try {
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            s.onNext(1);
        }
    })
    .observeOn(Schedulers.io())
    .subscribeOn(Schedulers.io())
    .publish();

    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onNext(Integer i) {
            System.out.println("got "+i);
        }
        @Override
        public void onCompleted() {
            System.out.println("completed");
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    };

    observable.subscribe(observer);
    observable.connect();

    latch.await();
}

This is what I've seen debugging RxJava's code I have found out the reason why it doesn't call the Observer's onNext() method but I don't understand it:

1.- s.onNext(1); is called:

2.- The execution gets to rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber.pollQueue():

void pollQueue() {
    int emitted = 0;
    final AtomicLong localRequested = this.requested;
    final AtomicLong localCounter = this.counter;
    do {
        localCounter.set(1);
        long produced = 0;
        long r = localRequested.get();            
        for (;;) {
            ...
            System.out.println("R: "+r);
            if (r > 0) {
                Object o = queue.poll();
                if (o != null) {
                    child.onNext(on.getValue(o));
                    r--;

The problem is the value of r. The first time it executes its value is always 128. After each call it decrements by 1 (r--). This means that ConnectableObservable can only notify its observers 128 times when using both observeOn and subscribeOn. If I remove subscribeOn, r's value starts over each iteration and it works.

UPDATE:

I found a solution: the problem was caused by the order of the .observerOn().subscribeOn(). If I reverse it to .subscribeOn().observeOn() it works (I can see that the value of r is always reset to 128).

Anyway I'd appreciate an explanation.

1
Are state and signal volatile variables?akarnokd
Nope...is it neccesary?codependent
Yes, otherwise changes across threads may not be visible or the variable read may be hoisted out of the loop entirely.akarnokd
Added volatile but it still stops after some time...I'm wondering if it could be because of the signal = null; statement right after the s.onNext(signal);... I doesn't make sense though.codependent
Did you subscribe to the observable? It seems nothing consumes your signal and the internal buffer just fills in.akarnokd

1 Answers

1
votes

Many async operators use internal, fixed size buffers and rely on subscribers requesting requently. In your case, something doesn't request properly which I can't say what it is. I suggest trying your use case with standard components to see what could be wrong, i.e., you can replace your custom Observable with a PublishSubject + sample:

Subject<Integer, Integer> source = PublishSubject.<Integer>create().toSerialized();

ConnectableObservable<Integer> co = source.sample(
    500, TimeUnit.MILLISECONDS, Schedulers.io())
.onBackpressureBuffer().publish();

co.subscribe(yourSubscriber);
co.connect();

source.onNext(1);