3
votes

Suppose I have some Observable which may have some arbitrarily long sequence of events at the time I subscribe to it but which may also continue to emit events after I subscribe. I am interested only in those events from the time at which I subscribe and later. How do I just get the latest events?

In this example I use a ReplaySubject as an artificial source to illustrate the question. In practice this would be some arbitrary Observable.

let observable = ReplaySubject<Int>.createUnbounded()

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

_ = observable.subscribe(onNext: {
    print($0)
})
observable.onNext(5)
observable.onNext(6)
observable.onNext(7)

Produces the output: 1 2 3 4 5 6 7

What I really want is only events from the time of subscription onwards. i.e. 4 5 6 7

I can use combineLatest with some other dummy Observable:

let observable = ReplaySubject<Int>.createUnbounded()

observable.onNext(1)
observable.onNext(2)
observable.onNext(3)
observable.onNext(4)

_ = Observable.combineLatest(observable, Observable<Int>.just(42)) { value, _ in value }
    .subscribe(onNext: {
    print($0)
})
observable.onNext(5)
observable.onNext(6)
observable.onNext(7)

which produces the desired output 4 5 6 7

How can I produce a similar result without artificially introducing another arbitrary Observable?

I have tried a number of things including combineLatest with an array consisting of just one observable, but that emits the complete sequence, not just the latest. I know I could use PublishSubject but I am just using ReplaySubject here as an illustration.

2
In general, Observables don't "have" events, they generate events. The only reason you have having the problem you describe is precisely because you are using a ReplaySubject and you have set it up to store and replay every event each time it is subscribed to. You say you are "using ReplaySubject here as an illustration", but is precisely the reason, and the only reason, you are having the problem you describe. So the answer is, don't use ReplaySubject.createUnbounded() and the "problem" will go away. Either that, or adjust the question to show the real problem you are having.Daniel T.
@DanielT. I take your point, but nevertheless, combineLatest with a second observable produces the desired result. I’m just curious why I can’t find an equivalent forumulation of “latest” for a single observableDale

2 Answers

1
votes

By default, an observable will call its generator for every subscriber and emit all of the values produced by that generator. So for example:

let obs = Observable.create { observer in 
    for each in [1, 2, 3, 5, 7, 11] { 
        observer.onNext(each)
    }
    observer.onCompleted()
}

(Note that the above is the implementation of Observable.from(_:))

Every time something subscribes to obs the closure is called and all 6 next events will be received. This is what's known as a "cold" observable, and again it's the default behavior. Assume an Observable is cold unless you know otherwise.

There is also the concept of a "hot" observable. A hot observable doesn't call its generator function when something subscribes to it.

Based on your question, and your subsequent comment, it sounds like you want to know how to make a cold observable hot... The fundamental way is by calling .multicast on it (or one of the operators that use its implementation like publish(), replay(_:) or replayAll().) There is also a special purpose operator called .share() that will "heat up" an observable and keep it hot until all subscribers unsubscribe to it (then it will be cold again.) And of course, Subjects are considered hot because they don't have a generator function to call.

Note however, that many observables have synchronous behavior, this means that they will emit all their values as soon as something subscribes and thus will have already completed before any other observer (on that thread) has a chance to subscribe.

Some more examples... .interval(_:scheduler:) is a cold observable with async behavior. Let's say you have the following:

let i = Observable<Int>.interval(.seconds(3), scheduler: MainScheduler.instance)
i.subscribe(onNext: { print($0, "from first") })
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    i.subscribe(onNext: { print($0, "from second") })
}

What you will find is that each observer will get it's own independent stream of values (both will start with 0) because the generator inside interval is called for both observers. So you will see output like:

0 from first
1 from first
0 from second
2 from first
1 from second
3 from first
2 from second

If you multicast the interval you will see different behavior:

let i = Observable<Int>.interval(.seconds(3), scheduler: MainScheduler.instance)
    .publish()
i.subscribe(onNext: { print($0, "from first") })
i.connect()
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    i.subscribe(onNext: { print($0, "from second") })
}

The above will produce:

0 from first
1 from first
1 from second
2 from first
2 from second
3 from first
3 from second

(Note that "second" started with 1 instead of 0.) The share operator will work the same way in this case except you don't have to call connect() because it does it automatically.

Lastly, watch out. If you publish a synchronous observable, you might not get what you expect:

let i = Observable.from([1, 2, 3, 5])
    .publish()
i.subscribe(onNext: { print($0, "from first") })
i.connect()
i.subscribe(onNext: { print($0, "from second") })

produces:

1 from first
2 from first
3 from first
5 from first

Because all 5 events (the four next events and the completed event) emit as soon as connect() is called before the second observer gets a chance to subscribe.

An article that might help you is Hot and Cold Observables but it's pretty advanced...

0
votes

Why not simply use a publish subject like this? Isn't this the desired output? Publish Subjects only emits the elements after it's subscribed. And that's the whole purpose of it.

    let observable = PublishSubject<Int>()
    observable.onNext(1)
    observable.onNext(2)
    observable.onNext(3)
    observable.onNext(4)

    _ = observable.subscribe(onNext: {
        print($0)
    })
    observable.onNext(5)
    observable.onNext(6)
    observable.onNext(7)
}

If you don't want to use a subject you can share the observable and add a 2nd subscriber like this,

    let observable = ReplaySubject<Int>.createUnbounded()
    observable.onNext(1)
    observable.onNext(2)
    observable.onNext(3)
    observable.onNext(4)
    
    let shared = observable.share()
    
   // this will print full sequence
    shared.subscribe(onNext: {
        print("full sequence: \($0)")
    }).disposed(by: disposeBag)


    // this will only print new events
    shared.subscribe(onNext: {
        print("shared sequence: \($0)")
    }).disposed(by: disposeBag)

    // new events
    observable.onNext(5)
    observable.onNext(6)
    observable.onNext(7)

Observables are lazy, pull driven sequences. Without your first subscription stream won't even start. Once started, by sharing it, you can subscribe only to the new events.