1
votes

I have a source stream that doesn't complete, but I need an operator that will complete observers after an emitted value meets some requirements to make it the last item to observers. At the moment, my code just uses takeUntil() on an observable that emits when a value from the source has met the criteria.

const source: Observable<any> = nonCompletingStream();
const conditionSatisfied: Observable<boolean> = source.pipe(
    map(val => meetsCriteriaToBeFinalValue(val)),
    skipWhile(meetsCriteria => meetsCriteria === false),
    shareReplay(),  // so that this emits after the fact
    take(1) // only needs to know if the criteria is met once
);
const example: Observable<any> = source.pipe(
    takeUntil(conditionSatisfied)
);

example.subscribe(
    val => { // does stuff with individual values },
    err => { // handles error },
    () => { // this gets triggered one item too early }
);

The problem with this is that when the item that meets the criteria comes through (causing conditionSatisfied to emit), the observer on example completes before the "last value" can make it to the next callback. Is there a way to let the final value pass through to the observer before completing the stream?

Reading through the options, expand looks similar to what I need by letting values pass through before recursively pushing new observables to the stream. However, although it can add values to the stream, it doesn't look like it can complete the stream.

EDIT: changed title of question to be more general

2

2 Answers

0
votes

You can have conditionSatisfied emit the original value instead of boolean and then concatenate it to the observable that completes based on the condition:

const source = nonCompletingStream();
const conditionSatisfied = source.pipe(
    filter(val => meetsCriteriaToBeFinalValue(val)),
    shareReplay(),
    take(1)
);
const example = concat(
  source.pipe(
      takeUntil(conditionSatisfied)
  ),
  conditionSatisfied
)

Demonstrated here https://stackblitz.com/edit/rxjs-qydhh3

0
votes

By changing your stream to hot observable you can control its lifecycle directly.

const source = nonCompletingStream().pipe(publish());

Start the subscription by connecting the source observable:

const example = source.subscribe(...);
const subscription = source.connect();

And stop the subscription by unsubscribing the source observable:

source.pipe(filter(meetsCriteriaToBeFinalValue)).subscribe(() => subscription.unsubscribe());

Live example:

const { interval } = rxjs;
const { publish, filter } = rxjs.operators;

const source$ = interval(100).pipe(publish());
const stopCriteria = x => x === 3;

const example$ = source$.subscribe(console.log);

const subscription = source$.connect();

source$.pipe(
  filter(stopCriteria)
).subscribe(() => subscription.unsubscribe());
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>