3
votes

I have an Observable, source, that may emit items at unpredictable times. I'm trying to use it to build another Observable that reliably emits its values every 500ms.

Let's say that source emits values at these times:

  • 100ms - first item
  • 980ms - second item
  • 1020ms - third item
  • 1300ms - fourth item, etc.

I'd like to "smooth" this stream, so that I get outputs like:

  • 500ms - first item
  • 1000ms - second item
  • 1500ms - third item
  • 2000ms - fourth item

A naive approach might be to just add a delay in between emissions of source items. But, that won't create evenly spaced intervals, like I want.

I've tried various combinations of .timer(), .interval(), and .flatMap(), but nothing promising, yet.

3
What happens if the source emits 3 times in a 500ms interval, e.g. emits at 100, 200 and 300? What happens if it does not emit for 1000ms?Picci

3 Answers

1
votes

For a source emitting faster than your interval

zip your source with an interval of the required time span.

zip(source, interval(500)).pipe(
  map(([value, _]) => value)  // only emit the source value
)

enter image description here

zip emits the 1st item from source with the 1st item from interval, then the 2nd item from source with the 2nd item from interval and so on. If the output observable should only emit when interval emits, the Nth value from source has to arrive before the Nth value from interval.

Potential Problem: If your source emits slower than interval at some point (i.e. the Nth value from source arrives after the Nth value from interval) then zip will emit directly without waiting for the next time interval emits.

// the 5th/6th value from source arrive after the 5th/6th value from interval
                                              v    v
source:       -1--------2-3---4---------------5----6-----
interval:     -----1-----2-----3-----4-----5-----6-----7-
zip output:   -----1-----2-----3-----4--------5----6-----
                   ✓     ✓     ✓     ✓        ⚠️    ⚠️
// emits 5 and 6 don't happen when interval emits

For a source emitting at any rate

function emitOnInterval<T>(period: number): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) =>
    defer(() => {
      let sourceCompleted = false;
      const queue = source.pipe(
        tap({ complete: () => (sourceCompleted = true) }),
        scan((acc, curr) => (acc.push(curr), acc), []) // collect all values in a buffer
      );
      return interval(period).pipe(
        withLatestFrom(queue), // combine with the latest buffer
        takeWhile(([_, buffer]) => !sourceCompleted || buffer.length > 0), // complete when the source completed and the buffer is empty
        filter(([_, buffer]) => buffer.length > 0), // only emit if there is at least on value in the buffer
        map(([_, buffer]) => buffer.shift()) // take the first value from the buffer
      );
    });
}

source.pipe(
  emitOnInterval(500)
)
// the 5th/6th value from source arrive after the 5th/6th value from interval
                                              v    v
source:       -1--------2-3---4---------------5----6-----
interval:     -----1-----2-----3-----4-----5-----6-----7-
output:       -----1-----2-----3-----4-----------5-----6-
                   ✓     ✓     ✓     ✓           ✓     ✓   
// all output emits happen when interval emits

https://stackblitz.com/edit/rxjs-qdlktm?file=index.ts

2
votes

I think you could try this:

const src$ = merge(
  timer(100).pipe(mapTo(1)),
  timer(980).pipe(mapTo(2)),
  timer(1020).pipe(mapTo(3)),
  timer(1300).pipe(mapTo(4))
);

src$
  .pipe(
    bufferTime(500),
    mergeAll()
  )
  .subscribe(console.log);

bufferTime is used in order to create a timer that will emit at constant intervals, irrespective of the emitted values. Then mergeAll is used to explode the array resulted from bufferTime.

StackBlitz demo.

1
votes

You can use a combination of combineLatest, interval and throttle - you add a second observable, interval with the time between calls you want (e.g. 500ms), so every 500ms your observable will emit (when used with combineLatest), now it will emit the values every 500ms and every time the original source emits, so you can add throttle in a pipe and that will cause the interval to throttle:

combineLatest([source, timer(5000)])
  .pipe(
    throttle(() => interval(5000)),
    tap(([value]) => {
      console.log("emitted", value, new Date().getSeconds());
    })
  )
  .subscribe();

(tap is not required here, just added to demonstrate)