3
votes

I would like to emit all original values from an RxJS stream, and then emit a summary upon completion.

Reduce stops the original values from emitting. Scan emits each total rather than the original values.

Here is my hacky solution:

let total = {
  total: 0
};

Rx.Observable.range(1, 3)
  .do(val => {
    total.total += val;
  })
  .concat(Rx.Observable.of(total))
  .subscribe(
    value => {
      console.log('Next:', value)
    }
  );

// Next: 1
// Next: 2
// Next: 3
// Next: { total: 6 }

What is a simple way to do this with pure RxJS streams?

3

3 Answers

4
votes

Use multicast

Rx.Observable.range(1, 3)
 .multicast(new Rx.Subject(), (shared)=> {
    return Rx.Observable.merge(shared, shared.reduce((acc, x)=>acc+x,0))
 })
.subscribe(x=>console.log(x))
2
votes

As an alternative, you could avoid using share() and making two Observable chains and make just a single chain:

Observable.range(1, 3)
    .concat(Observable.of(null))
    .scan((obj, curr) => {
        if (curr) {
            obj.acc.push(curr);
        }
        obj.curr = curr;
        return obj;
    }, { acc: [], curr: 0 })
    .map(obj => obj.curr === null
        ? { total: (obj.acc.reduce((acc, curr) => acc + curr, 0)) }  // count total
        : obj.curr  // just return current item
    )
    .subscribe(console.log);

This prints the result you're expecting:

1
2
3
{ total: 6 }

Even though using share() looks very simple be aware that it in fact you subscribe to the source Observable twice. In practise maybe it's not a problem for you depending on what source Observable you'll use.

Try this and see that each number is printed twice:

let source = Observable.range(1, 3).do(console.log).share();
1
votes

How about?

let source = Observable.range(1, 3).share();

let totalOb = source
    .reduce((total, value) => total + value, 0);

source
    .concat(totalOb)
    .subscribe( value => console.log(`Next: ${value}`) );

Output:

Next: 1
Next: 2
Next: 3
Next: 6

You can use throw and catch to separate data and summary.

let source = Observable.range(1, 3).share();

let totalOb = source
    .reduce((total, value) => total + value, 0)
    .mergeMap(total => Observable.throw(total));

source
    .concat(totalOb)
    .subscribe(
        value => console.log(`Next: ${value}`),
        value => console.log(`Total: ${value}`)
    );

Output:

Next: 1
Next: 2
Next: 3
Total: 6