2
votes

I'm building an observable from an event, which outputs text lines that confirm records which are divided by two empty lines. For example:

xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx


xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx


xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx

I'd like the observable to split the output by two new lines, so the subscriber gets the data in chunks.

How can I accomplish this using RxJS? It doesn't seem to have a function to perform this job.

I could subscribe to the observer, accumulate values and re-emit myself, but I believe there is a more elegant solution that I'm not seeing.

1
Best is to expose your requirements in a marble, which is basically your inputs and expected outputs on a timeline. Otherwise do you mean that your observable is only output one line at a time? Hence an empty line is just a \n?user3743222

1 Answers

2
votes

A combination of buffer and sample and scan could also work. Basically you accumulate inputs (i.e. lines) in a buffer. You release this buffer every time an observable emits a value, with sample. Then you make it so that observable emits a value every time it detects two consecutives \n. This can be achieved with scan. Note that this requires your source$ to be a hot observable.

So, you could complete the following code and keep us updated if it worked in the end :

var detect_two_lines = function (acc, new_line)){
  // if new_line and last line of acc are both \n
  // then acc.arr_lines = [], acc.found = true
  // else acc.arr_lines.push(new_line), acc.found = false
}
var identity = function(x){return x}
var sample$ = source$.scan(detect_two_lines, {arr_lines:[], found:false})
                     .pluck('found')
                     .filter(identity);
var results$ = source$.buffer(sample$);