0
votes

I would like to integrate a subject into a pipe such that later operators can subscribe to earlier observables.

Context: I want to implement a more sophisticated "debounce" where only certain events can block certain other events. For that, first I want to reimplement "debounce".

This is what I have in mind:

const debounced = myEventStream.pipe(
    useSubject(futureEvents => pipe(
        flatFilter(
            myEvent => race(
                interval(10).pipe(map(x => true)),
                futureEvents.pipe(/* filter(...), */ map(x => false))
            )
        )
    )),
);

useSubject would provide the following stages of the pipe with an observable of future events. Thus, if flatFilter would exist, events are not filtered, if the interval triggers first, but filtered, if a newer event comes in.

Is such an useSubject reasonable - can it work? Does such an useSubject already exists?

1

1 Answers

0
votes

It works like a charm:

function mergeFilter<T>(
    predicate: (arg: T) => Observable<boolean>
): MonoTypeOperatorFunction<T> {
    return mergeMap(evt =>
        predicate(evt).pipe(
            take(1),
            filter(e => e),
            map(e => evt)
        )
    );
}

function useSubject<T, O>(
    fn: (o: Observable<T>) => OperatorFunction<T, O>
): OperatorFunction<T, O> {
    return function(input: Observable<T>): Observable<O> {
        const subject = new Subject<T>();
        const observer: PartialObserver<T> = subject;
        return input.pipe(
            tap(observer),
            fn(subject)
        );
    };
}

function debounceTime<T>(time: number) {
    return useSubject<T, T>(futureEvents =>
        mergeFilter(myEvent =>
            race(
                // allow event after 10ms
                interval(10, s).pipe(map(x => true)),
                // discard event when new event arrived
                futureEvents.pipe(map(x => false))
            )
        )
    );
}

This is still a lot shorter than the original rxjs debounce implementation, but its performance might be worse.