6
votes

Java 8 Streams do not permit reuse. This creates a puzzle about how to reuse a stream when creating a sliding window flux to calculate a relationship like x(i)*x(i-1).

The following code is based on the idea of a shift operator. I shift the first stream with skip(1) to create a second stream.

Flux<Integer> primary = Flux.fromStream(IntStream.range(1, 10).boxed());
Flux<Integer> secondary = primary.skip(1);
primary.zipWith(secondary)
        .map(t -> t.getT1() * t.getT2())
        .subscribe(System.out::println);

Here is a visual representation of the above code:

1 2 3 4 5 6 7 8 9 10
v v v v v v v v v v  skip(1)
2 3 4 5 6 7 8 9 10
v v v v v v v v v v  zipWith
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9, 9 10 <- sliding window of length 2
v v v v v v v v v v  multiples
2 6 12 20 30 42 56 72 90

Unfortunately this code errors as:

java.lang.IllegalStateException: stream has already been operated upon or closed

The obvious work-around is to cache the elements and ensure the cache size is greater than or equal to the stream size:

Flux<Integer> primary = Flux.fromStream(IntStream.range(1, 10).boxed()).cache(10);

or use a stream replacement:

Flux<Integer> primary = Flux.range(0, 10);

The second solution will just re-execute the original sequence for the skip(1) sequence.

However an efficient solution only requires a buffer of size 2. This is a big deal if the stream happens to be a large file:

Files.lines(Paths.get(megaFile));

How can I buffer a stream efficiently so multiple subscriptions to the primary Flux do not cause everything to be read into memory or cause re-executions?

3

3 Answers

5
votes

I finally discovered a solution although it is not buffer-oriented. The inspiration was to first solve the problem for a sliding window of 2:

Flux<Integer> primary = Flux.fromStream(IntStream.range(0, 10).boxed());
primary.flatMap(num -> Flux.just(num, num))
    .skip(1)
    .buffer(2)
    .filter(list -> list.size() == 2)
    .map(list -> Arrays.toString(list.toArray()))
    .subscribe(System.out::println);

A visual representation of the process follows:

1 2 3 4 5 6 7 8 9 
V V V V V V V V V    Flux.just(num, num)
1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
V V V V V V V V V    skip(1)
1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
V V V V V V V V V    bufffer(2)
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9, 9
V V V V V V V V V    filter
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9

This is the output:

[0, 1]
[1, 2]
[2, 3]
[3, 4]
[4, 5]
[5, 6]
[6, 7]
[7, 8]
[8, 9]

Then I generalized the above idea to create a solution for an arbitrary sliding window size:

public class SlidingWindow {

    public static void main(String[] args) {
        System.out.println("Different sliding windows for sequence 0 to 9:");
        SlidingWindow flux = new SlidingWindow();
        for (int windowSize = 1; windowSize < 5; windowSize++) {
            flux.slidingWindow(windowSize, IntStream.range(0, 10).boxed())
                .map(SlidingWindow::listToString)
                .subscribe(System.out::print);
            System.out.println();
        }

        //show stream difference: x(i)-x(i-1)
        List<Integer> sequence = Arrays.asList(new Integer[]{10, 12, 11, 9, 13, 17, 21});
        System.out.println("Show difference 'x(i)-x(i-1)' for " + listToString(sequence));
        flux.slidingWindow(2, sequence.stream())
            .doOnNext(SlidingWindow::printlist)
            .map(list -> list.get(1) - list.get(0))
            .subscribe(System.out::println);
        System.out.println();
    }

    public <T> Flux<List<T>> slidingWindow(int windowSize, Stream<T> stream) {
        if (windowSize > 0) {
            Flux<List<T>> flux = Flux.fromStream(stream).map(ele -> Arrays.asList(ele));
            for (int i = 1; i < windowSize; i++) {
                flux = addDepth(flux);
            }
            return flux;
        } else {
            return Flux.empty();
        }
    }

    protected <T> Flux<List<T>> addDepth(Flux<List<T>> flux) {
        return flux.flatMap(list -> Flux.just(list, list))
            .skip(1)
            .buffer(2)
            .filter(list -> list.size() == 2)
            .map(list -> flatten(list));
    }

    protected <T> List<T> flatten(List<List<T>> list) {
        LinkedList<T> newl = new LinkedList<>(list.get(1));
        newl.addFirst(list.get(0).get(0));
        return newl;
    }

    static String listToString(List list) {
        return list.stream()
            .map(i -> i.toString())
            .collect(Collectors.joining(", ", "[ ", " ], "))
            .toString();
    }

    static void printlist(List list) {
        System.out.print(listToString(list));
    }

}

The output of the above code is as follows:

Different sliding windows for sequence 0 to 9:
[ 0 ], [ 1 ], [ 2 ], [ 3 ], [ 4 ], [ 5 ], [ 6 ], [ 7 ], [ 8 ], [ 9 ], 
[ 0, 1 ], [ 1, 2 ], [ 2, 3 ], [ 3, 4 ], [ 4, 5 ], [ 5, 6 ], [ 6, 7 ], [ 7, 8 ], [ 8, 9 ], 
[ 0, 1, 2 ], [ 1, 2, 3 ], [ 2, 3, 4 ], [ 3, 4, 5 ], [ 4, 5, 6 ], [ 5, 6, 7 ], [ 6, 7, 8 ], [ 7, 8, 9 ], 
[ 0, 1, 2, 3 ], [ 1, 2, 3, 4 ], [ 2, 3, 4, 5 ], [ 3, 4, 5, 6 ], [ 4, 5, 6, 7 ], [ 5, 6, 7, 8 ], [ 6, 7, 8, 9 ], 

Show difference 'x(i)-x(i-1)' for [ 10, 12, 11, 9, 13, 17, 21 ], 
[ 10, 12 ], 2
[ 12, 11 ], -1
[ 11, 9 ], -2
[ 9, 13 ], 4
[ 13, 17 ], 4
[ 17, 21 ], 4
0
votes

I've implemented the following solution:

public <T> Flux<Flux<T>> toSlidingWindow(Flux<T> source, int size) {
    return toSlidingWindow(source, deque -> {
        while (deque.size() > size) {
            deque.poll();
        }
        return Flux.fromIterable(deque);
    });
}

public <T> Flux<Flux<T>> toSlidingWindow(Flux<T> source, Function<Deque<T>, Flux<T>> dequePruneFunction) {
    return source.map(ohlc -> {
        Deque<T> deque = dequeAtomicReference.get();
        deque.offer(ohlc);
        return dequePruneFunction.apply(deque);
    });
}

This can either be a fixed size sliding window or use a custom function to determine the extent of each window.

If any multithreading problems arise using it like this you could copy the Deque within an acquire and release block which seems to be supported by AtomicReference. This would ensure the resulting window Flux to remain unchanged by other threads.

Perhaps like so:

public <T> Flux<Flux<T>> toSlidingWindowAsync(Flux<T> source, int size) {
    return toSlidingWindowAsync(source, deque -> {
        while (deque.size() > size) {
            deque.poll();
        }
        return Flux.fromIterable(new LinkedList<>(deque));
    });
}

public <T> Flux<Flux<T>> toSlidingWindowAsync(Flux<T> source, Function<Deque<T>, Flux<T>> dequePruneFunction) {
    AtomicReference<Deque<T>> dequeAtomicReference = new AtomicReference<>(new LinkedList<>());
    return source.map(ohlc -> {
        Deque<T> deque = dequeAtomicReference.getAcquire();
        deque.offer(ohlc);
        Flux<T> windowFlux = dequePruneFunction.apply(deque);
        dequeAtomicReference.setRelease(deque);
        return windowFlux;
    });
}

This copies the Deque that is used for each resultant sliding window.

0
votes

If you are using Reactor Core 3 (I'm not sure when this operator was released), you can simply use

    Flux.fromStream(IntStream.rangeClosed(1, 10).boxed())
            .buffer(2, 1)
            .skipLast(1)
            .map(t -> t.stream().reduce((a, b)-> a*b))
            .subscribe(System.out::println);

The magic is the buffer(2, 1) part: here maxSize is 2, and skip is 1. Since maxSize is greater than skip, this creates overlapping buffers (that is, sliding windows) over the flux, and emits each buffer as a List. The skipLast(1) is needed, as the last buffer will be a single element (of 10), this needs to be skipped.