I have a source stream that doesn't complete, but I need an operator that will complete observers after an emitted value meets some requirements to make it the last item to observers. At the moment, my code just uses takeUntil() on an observable that emits when a value from the source has met the criteria.
const source: Observable<any> = nonCompletingStream();
const conditionSatisfied: Observable<boolean> = source.pipe(
map(val => meetsCriteriaToBeFinalValue(val)),
skipWhile(meetsCriteria => meetsCriteria === false),
shareReplay(), // so that this emits after the fact
take(1) // only needs to know if the criteria is met once
);
const example: Observable<any> = source.pipe(
takeUntil(conditionSatisfied)
);
example.subscribe(
val => { // does stuff with individual values },
err => { // handles error },
() => { // this gets triggered one item too early }
);
The problem with this is that when the item that meets the criteria comes through (causing conditionSatisfied to emit), the observer on example completes before the "last value" can make it to the next callback. Is there a way to let the final value pass through to the observer before completing the stream?
Reading through the options, expand looks similar to what I need by letting values pass through before recursively pushing new observables to the stream. However, although it can add values to the stream, it doesn't look like it can complete the stream.
EDIT: changed title of question to be more general