2
votes

I have an Observable:

1----2----3----3----2----1-|

I use

.bufferCount(2, 1)

and the output is

--[1,2]-[2,3]-[3,3]-[3,2]-[2,1]-|

then I chain

.filter(twoEmissions => twoEmissions[0] !== twoEmissions[1])

which makes:

 --[1,2]-[2,3]-------[3,2]-[2,1]-|

then I chain

.map(twoEmissions => twoEmissions[1])

to leave only latest emissions, so my final output is:

-----2-----3------------2----1---|

My problem is that first emission is ommited int this case. I tried to use buffer() insted of bufferCount() with following closingNotifier:

bufferClosingNotifier = sourceObservable$
  .scan((acc, val, index) => (index), 0)
  .filter((index: number) => index === 0 || index > 0 && index % 2 !== 0)

It emits 1st, 3rd, 5th, 7th... emission but they are not overlapping.

How can I have bufferCount(2, 1) but emitting first emission of source observable anyway?

1
.startWith(undefined).bufferCount(2, 1) - cartant
thank you @cartant, that helped! - Jarosław Rewers

1 Answers

3
votes

If your requirement is the following:

Given a stream of values:

1----2----3----3----2----1-|

I only want succeeding unique values so that the ending stream is

1----2----3---------2----1-|

You can use .distinctUntilChanged() to get this behaviour:

public distinctUntilChanged(compare: function): Observable source

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.

If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.

If a comparator function is not provided, an equality check is used by default.

Rx.Observable.from([1,2,3,3,2,1])
  .distinctUntilChanged()
  .subscribe(val => console.log(val))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.js"></script>