5
votes

Is there an operator in RxJS that would allow me to buffer items and let them out one by one whenever a signal observable fires? Sort of like bufferWhen, but instead of dumping the whole buffer on each signal it would dump a certain number per signal. It could even dump the number that gets emitted by the signal observable.

Input observable:  >--a--b--c--d--|
Signal observable: >------1---1-1-|
Count in buffer:   !--1--21-2-121-|
Output observable: >------a---b-c-|
2

2 Answers

6
votes

Yes, you can use zip to do what you want:

const input = Rx.Observable.from(["a", "b", "c", "d", "e"]);
const signal = new Rx.Subject();
const output = Rx.Observable.zip(input, signal, (i, s) => i);
output.subscribe(value => console.log(value));
signal.next(1);
signal.next(1);
signal.next(1);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

In fact, zip is used as an example in this GitHub issue that pertains to buffering.

If you want to use the signal's emitted value to determine how many buffered values are to be released, you could do something like this:

const input = Rx.Observable.from(["a", "b", "c", "d", "e"]);
const signal = new Rx.Subject();
const output = Rx.Observable.zip(
  input,
  signal.concatMap(count => Rx.Observable.range(0, count)),
  (i, s) => i
);
output.subscribe(value => console.log(value));
signal.next(1);
signal.next(2);
signal.next(1);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
1
votes

window can be used to separate the timeline. And takeLast is used to hold the output.

    let signal = Rx.Observable.interval(1000).take(4);

    let input = Rx.Observable.interval(300).take(10).share();

    let output = input
        .do(value => console.log(`input = ${value}`))
        .window(signal)
        .do(() => console.log(`*** signal : end OLD and start NEW subObservable`))
        .mergeMap(subObservable => {
            return subObservable.takeLast(100);
        })
        .share()

    output.subscribe(value => console.log(`    output = ${value}`));

    Rx.Observable.merge(input.mapTo(1), output.mapTo(-1))
        .scan((count, diff) => {
            return count + diff;
        }, 0)
        .subscribe(count => console.log(`            count = ${count}`));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

Result:

22:28:37.971 *** signal : end OLD and start NEW subObservable  
22:28:38.289 input = 0  
22:28:38.292             count = 1  
22:28:38.575 input = 1  
22:28:38.576             count = 2  
22:28:38.914 input = 2  
22:28:38.915             count = 3  
            <signal received>
22:28:38.977     output = 0  
22:28:38.979             count = 2  
22:28:38.980     output = 1  
22:28:38.982             count = 1  
22:28:38.984     output = 2  
22:28:38.986             count = 0  
22:28:38.988 *** signal : end OLD and start NEW subObservable  
22:28:39.175 input = 3  
22:28:39.176             count = 1  
22:28:39.475 input = 4  
22:28:39.478             count = 2  
22:28:39.779 input = 5  
22:28:39.780             count = 3  
            <signal received>
22:28:39.984     output = 3  
22:28:39.985             count = 2  
22:28:39.986     output = 4  
22:28:39.988             count = 1  
22:28:39.989     output = 5  
22:28:39.990             count = 0  
22:28:39.992 *** signal : end OLD and start NEW subObservable  
22:28:40.075 input = 6  
22:28:40.077             count = 1  
22:28:40.377 input = 7  
22:28:40.378             count = 2  
22:28:40.678 input = 8  
22:28:40.680             count = 3  
22:28:40.987 input = 9  
22:28:40.990             count = 4  
            <input completed>
22:28:40.992     output = 6  
22:28:40.993             count = 3  
22:28:40.995     output = 7  
22:28:40.996             count = 2  
22:28:40.998     output = 8  
22:28:40.999             count = 1  
22:28:41.006     output = 9  
22:28:41.007             count = 0