Java 8 Streams do not permit reuse. This creates a puzzle about how to reuse a stream when creating a sliding window flux to calculate a relationship like x(i)*x(i-1).
The following code is based on the idea of a shift operator. I shift the first stream with skip(1) to create a second stream.
Flux<Integer> primary = Flux.fromStream(IntStream.range(1, 10).boxed());
Flux<Integer> secondary = primary.skip(1);
primary.zipWith(secondary)
.map(t -> t.getT1() * t.getT2())
.subscribe(System.out::println);
Here is a visual representation of the above code:
1 2 3 4 5 6 7 8 9 10
v v v v v v v v v v skip(1)
2 3 4 5 6 7 8 9 10
v v v v v v v v v v zipWith
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9, 9 10 <- sliding window of length 2
v v v v v v v v v v multiples
2 6 12 20 30 42 56 72 90
Unfortunately this code errors as:
java.lang.IllegalStateException: stream has already been operated upon or closed
The obvious work-around is to cache the elements and ensure the cache size is greater than or equal to the stream size:
Flux<Integer> primary = Flux.fromStream(IntStream.range(1, 10).boxed()).cache(10);
or use a stream replacement:
Flux<Integer> primary = Flux.range(0, 10);
The second solution will just re-execute the original sequence for the skip(1) sequence.
However an efficient solution only requires a buffer of size 2. This is a big deal if the stream happens to be a large file:
Files.lines(Paths.get(megaFile));
How can I buffer a stream efficiently so multiple subscriptions to the primary Flux do not cause everything to be read into memory or cause re-executions?