1
votes

I'm seeing some unexpected results when using combineLatest on a collection of cold observables. It emits the latest from all but the last Observable and instead combines the latest from the first (n-1) Observables with each element from the nth Observable.

let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")

let latestObserver = Observable.combineLatest(observable, observable2)

_ = latestObserver
    .subscribe(onNext: {
    print($0)
})
.disposed(by: disposeBag)

Produces the output: (4, "bed") (4, "book") (4, "table")

I had expected to see output of just (4, "table").

If I change the order of the observables like so:

let latestObserver = Observable.combineLatest(observable2, observable)

The I get the output: ("table", 1) ("table", 2) ("table", 3) ("table", 4)

If I add a final arbitrary Observable then I see just the latest from each of the first ones:

let observable = ReplaySubject<Int>.createUnbounded()
let observable2 = ReplaySubject<String>.createUnbounded()
let observable3 = Observable<Int>.just(42)

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

observable2.onNext("bed")
observable2.onNext("book")
observable2.onNext("table")

let latestObserver = Observable.combineLatest(observable, observable2, observable3)

_ = latestObserver
    .subscribe(onNext: {
    print($0)
})
.disposed(by: disposeBag)

produces the output: (4, "table", 42)

Is this really the expected behaviour?

1
Based on this question, and your last one, I get the impression that you are having trouble tracking which observables are synchronous and which are asynchronous...Daniel T.
@DanielT. Indeed. Without going into too much detail, it stems from a protocol specification of a model as an input to a view controller. The protocol specifies Observables where I have previously been used to seeing a BehaviorRelay or BehaviourSubject. The view controller should only update its display and controls from the latest values, and ignore any intermediate replayed values. Just trying to understand the ramifications of various possible future concrete implementations of the protocol - which may be beyond my control. I thought I understood things but caught some surprises. ThxDale
Ah, I see... You should be avoiding the use of subjects anyway (including ReplaySubject.) Maybe that will help you avoid emitting all these "intermediate replay values" in the first place. Subjects should only be used to turn non-Rx code into Rx and to deal with dependency cycles (where the output of an Observable can affect its own input.)Daniel T.

1 Answers

1
votes

Let's break down what is happening in your first example...

You could use Observable.from instead of the subject and get the same results... Steps in the code are as follows,

  1. create two ReplaySubjects and load them up with events.
  2. The combineLatest operator subscribes to the first subject.
  3. That first subject immediately replays all of its values.
  4. Since the second subject hasn't been subscribed to yet, the combineLatest operator doesn't emit anything, instead it silently absorbs the values while always storing the "latest" one.
  5. The combineLatest operator then subscribes to the second subject.
  6. That subject the replays all of its values.
  7. Since the combineLatest operator has now received a next event from each of its sources, it emits the values emitted from the second source, combined with the latest (i.e., last) from the first source.

Your replay subjects are inherently synchronous. They emit all the values to the subscriber immediately upon subscription.

In your last example code, since the last observable of the three emits only one value and only after the other two have emitted all of their values, you only see one output containing the latest from the previous two observables.