0
votes

Context: I've got many ConnectableObservables, almost all of which have a replay count of 1 or more. There are many observers of the observables subscribing and unsubscribing at any given time.

What I want: In many cases, when an observer subscribes to one of these observables, I don't care about the possible pre-existing emitted data that I'd receive because of the observable's data replay mechanism. The only data that the recently-subscribed observer is interested in, is data that is emitted after the moment of subscription.

const observable = Rx.Observable
    .interval(100)
    .take(4)
    .publishReplay(3);

observable.connect();

Problem: As far as I can tell, when an observer subscribes to the observable, it has no way of knowing whether or not the data it observes was emitted before or after the moment of subscription.

observable.subscribe(x => console.log('observed', x));

setTimeout(() => 
    observable.subscribe(y => console.log('delayed observed', y)), 
    400
);

The code above will output:

// => observed 0
// => observed 1
// => observed 2
// => delayed observed 0 **don't care**
// => delayed observed 1 **don't care**
// => delayed observed 2 **don't care**
// => observed 3
// => delayed observed 3

In this hypothetical situation, the delayed observer is only interested in data emitted after the moment of subscription; in this case, 3.

I've scoured the RxJS 5 reference docs and can't seem to find a silver-bullet operator to accomplish what I'm after. Any ideas?

2
What is the actual problem you're trying to solve with this? Could you provide a less abstract example?jonrsharpe
So who needs the replay? Can't the delayed observers subscribe to a version of the stream that is not replayed?Ptival
What you want is call Hot Observables. Check out this doc github.com/Reactive-Extensions/RxJS/blob/master/doc/…John Siu

2 Answers

1
votes

You can use .skipUntil(Rx.Observable.timer(0)) because the replayed elements will be replayed synchronously, and skipUntil will take the rest of the Observable out of synchronous execution for exactly that moment when it would receive the replay values.

This code would produce the result you want:

const observable = Rx.Observable
    .interval(100)
    .take(4)
    .publishReplay(3);

observable.subscribe(x => console.log('observed', x));

setTimeout(() => 
    observable
        .skipUntil(Rx.Observable.timer(0))
        .subscribe(y => console.log('delayed observed', y)), 
    400
);

observable.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.min.js"></script>
0
votes

Could you do something like this?

const observableNoReplay = Rx.Observable
    .interval(100)
    .take(4);

const observable = observableNoReplay
    .publishReplay(3);

observable.connect();

You could subscribe to whichever observable you need at the time and not have to worry about any sort of silver bullet.