1
votes

I would like to perform a moving window calculation on a Flux and produce a Flux containing the calculated values, but I can't get my head around how to accomplish this.

As a simplified example, say I have a Flux of Integers and I want to produce a new Flux with the sum of every 3 successive Integers in this Flux. To illustrate:

First flux contains Integers from 1 to 8: {1, 2, 3, 4, 5, 6, 7, 8}

Result flux should contain sums: {1+2+3, 2+3+4, 3+4+5, 4+5+6, 5+6+7, 6+7+8}

I can easily produce the first Flux and derive a flux of fluxes containing successive 3 values as follows:

Flux<Integer> f1 = Flux.range(1,8);
Flux<Flux<Integer>> f2 = f1.window(3,1);

I can also subscribe() to f2 and calculate the sums, but I can't figure out how to simultaneously publish those sums as a new Flux.

Am I missing something simple, or is this kind of thing actually hard to do?

1

1 Answers

1
votes

You can use .reduce(Integer::sum) on the inner flux to perform the sum of the elements in the window, and .flatMap on the outer flux to merge those sums back into a single stream.

Note that since .window is called with maxSize < skip, the trailing windows will have less than max size items in it.

Flux<Integer> sums = Flux.range(1, 8)                    // Flux<Integer>
        .window(3, 1)                                    // Flux<Flux<Integer>>
        .flatMap(window -> window.reduce(Integer::sum)); // Flux<Integer>

StepVerifier.create(sums)
        .expectNext(6)  // 1+2+3
        .expectNext(9)  // 2+3+4
        .expectNext(12) // 3+4+5
        .expectNext(15) // 4+5+6
        .expectNext(18) // 5+6+7
        .expectNext(21) // 6+7+8
        .expectNext(15) // 7+8
        .expectNext(8)  // 8
        .verifyComplete();