I'm playing around with rxJava/rxAndroid and there's something very basic that doesn't behave as I'd expect. I have this one observable and two subscribers:
Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));
Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
And this is the output:
D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
Now, I know I could avoid repeating the count by using publish().autoConnect()
but I'm trying to understand this default behaviour first.
Each time someone subscribe to the observable it starts emitting the number sequence. I get that. So, when when Subscriber 1
connects it starts emitting items. Subscriber 2
connects right away, why isn't it getting the values as well?
This is how I understand it, From the perspective of the observable:
Someone subscribed to me, I should start emitting items
[SUBSCRIBERS: 1][ITEMS TO EMIT: 1,2,3]Emit item '1' to subscribers
[SUBSCRIBERS: 1][ITEMS TO EMIT: 2,3]Someone else subscribed to me, I'll emit 1,2,3 once again when I'm done
[SUBSCRIBERS: 1 & 2][ITEMS TO EMIT: 2,3,1,2,3]Emit item '2' to subscribers
[SUBSCRIBERS: 1 & 2][ITEMS TO EMIT: 3,1,2,3]Emit item '3' to subscribers
[SUBSCRIBERS: 1 & 2][ITEMS TO EMIT: 1,2,3]Emit item '1' to subscribers
[SUBSCRIBERS: 1 & 2][ITEMS TO EMIT: 2,3]...
But this is not how it works. It's like they are two separate observables in one. this confuses me, why don't they give the items to all subscribers?
Bonus:
How is that publish().autoConnect()
fixes the problem?
Let's break it down. publish()
gives me a connectable observable. a connectable observable is just like a regular observable but you can tell it when to connect. Then I go ahead tell it to connect right away by calling autoConnect()
By doing so... don't I get the same thing I started with? A plain regular observable. The operators appear to cancel each other.
I could just shut up and use publish().autoconnect()
. But I'd like to understand more about how observables work.
Thanks!