0
votes

I am new into RxJava and I was under the impression that for each event each subscriber is being notified. So if we have N subscribers and a stream of X events the onNext for each of the N subscribers would be called. But when I run the following code:

public static void main(String[] args) {
        Observable<String> source = Observable.create(emitter -> {
            emitter.onNext("Hello");
            emitter.onNext("Foo");
            emitter.onNext("Bar");
            emitter.onNext("RxJava");
        });

        source.subscribe(e -> System.out.println("Observer 1: " + e));
        source.subscribe(e -> System.out.println("Observer 2: " + e));
    } 

I see:

Observer 1: Hello
Observer 1: Foo
Observer 1: Bar
Observer 1: RxJava
Observer 2: Hello
Observer 2: Foo
Observer 2: Bar
Observer 2: RxJava  

So basically after all the onNext are done only then the next observer is being triggered.

I was expecting to see:

Observer 1: Hello 
Observer 2: Hello
Observer 1: Foo
Observer 2: Foo
Observer 1: Bar
Observer 2: Bar
Observer 1: RxJava
Observer 2: RxJava 

That seems to me inefficient for very long streams, am I doing something wrong?

1

1 Answers

2
votes

RxJava sequences are synchronous by default thus the subscribe call above will run your emission code right there. To achieve the interleaving, you need a way to tell the source when both consumers are ready to receive. This can be done several ways:

ConnectableObservable<String> source = Observable.<String>create(emitter -> {
            emitter.onNext("Hello");
            emitter.onNext("Foo");
            emitter.onNext("Bar");
            emitter.onNext("RxJava");
        }).publish();

        source.subscribe(e -> System.out.println("Observer 1: " + e));
        source.subscribe(e -> System.out.println("Observer 2: " + e));

        source.connect();

or

ConnectableObservable<String> source = Observable.<String>create(emitter -> {
            emitter.onNext("Hello");
            emitter.onNext("Foo");
            emitter.onNext("Bar");
            emitter.onNext("RxJava");
        }).publish().refCount(2);

        source.subscribe(e -> System.out.println("Observer 1: " + e));
        source.subscribe(e -> System.out.println("Observer 2: " + e));