4
votes

I have an Rxjs observable (stream in the code below)that emits observables (subjOne and subjTwo). Each of the inner observables can emit their own values in any order and at any time. My task is to capture values from subjOne until subjTwo emits its first value.

const subjOne = new Subject();
const subjTwo = new Subject();

const stream = Observable.create(observer => {
    observer.next(subjOne);
    observer.next(subjTwo);
});

stream
    .someOperator(subj => subj)
    .subscribe(value => console.log('Value: ', value));

Example 1: subjOne emits values 1 and 2, then subjTwo emits value 3, then subjOne emits 4. The output should be: 1, 2, 3.

Example 2: subjTwo emits 1, then subjOne emits 2. The output should be 1.

switchMap doesn't fit here because it drops the values from subjOne as soon as subjTwo is emitted from stream. Any ideas about how to achieve that? Thanks.

UPDATE: In my actual case there are not only two inner observables -- subjOne and subjTwo -- but a constant stream of them, so manually hardcoding subjOne.takeUntil(subjTwo) is not a viable option.

3

3 Answers

4
votes

I think this does what you want:

// scan to let us keep combining the previous observable
// with the next observable
source
  .scan((current, next) => {
    // takeUntil to stop current when next produces
    const currentUntil = current.takeUntil(next);
    // now merge current with next
    return currentUntil.merge(next)
  }, Rx.Observable.empty())
  // switch to the most recent version of the combined inner observables
  .switch();

Note this will only work correctly if the inner observables are hot. If they are cold observables, it will take a bit more code to achieve.

0
votes

It sounds like you are looking for takeUntil.

takeUntil listens on one stream until the second stream emits. So for your examples

// Emits every second
const source = Observable.timer(0, 1000);

// Emits once after 3 seconds
const canceller = Observable.timer(3000);

source
  // Takes from the source until this stream emits
  .takeUntil(canceller)
  .subscribe({
    next: x => console.log(`Value ${x}`),
    complete: () => console.log('Done')
  });
// Value 0 -> 0 seconds
// Value 1 -> 1 seconds
// Value 2 -> 2 seconds
// Done -> 3 seconds
0
votes

You could get your two subjects out and then combine them with takeUntil :

const subjOne = new Subject();
const subjTwo = new Subject();

const stream = Observable.create(observer => {
    observer.next(subjOne);
    observer.next(subjTwo);
});

const first$ = stream.take(1).flatMap(x=>x)
const second$ = stream.skip(1).take(1).flatMap(x=>x)

first$.takeUntil(second$).subscribe(...)