1
votes

My usecase is to create an reactive endpoint like this :

public Flux<ServerEvent> getEventFlux(Long forId){
    ServicePoller poller = new ServicePollerImpl();
    Map<String,Object> params =  new HashMap<>();
    params.put("id", forId);

    Flux<Long> interval = Flux.interval(Duration.ofMillis(pollDuration));
    Flux<ServerEvent> serverEventFlux =
            Flux.fromStream(
                    poller.getEventStream(url, params) //poll a given endpoint after a fixed duration.
            );
    Flux<ServerEvent> sourceFlux= Flux.zip(interval, serverEventFlux)
             .map(Tuple2::getT2); // Zip the two streams. 
/* Here I want to store data from sourceFlux into a collection whenever some data arrives without disturbing the downstream processing in Spring. So that I can access collection later on without polling again */

This sends back the data to front end as soon as it is available , however my second use case is to pool that data as it arrives into a separate collection , so that if a similar request arrives later on , I can offload the whole data from the pool without hitting the service again .

I tried to subscribe the flux , buffer , cache and collect the flux before returning from the original flux the controller , but all of that seems to close the stream hence Spring cant process it.

What are my options to tap into the flux and store values into a collection as and when they arrive without closing the flux stream ?

Exception encountered :

java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:343) ~[na:1.8.0_171] at java.util.stream.ReferencePipeline.iterator(ReferencePipeline.java:139) ~[na:1.8.0_171] at reactor.core.publisher.FluxStream.subscribe(FluxStream.java:57) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE] at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE] at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:573) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE] at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:326) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]

2
First of all Spring has explicit support for caching. Second java.util.stream is not related to Spring Flux. You are using Java 8 streams somewhere. Without any code it's difficult to provide help. - a better oliver
added more code for better explanation of the problem. - Samrat

2 Answers

1
votes

poller.getEventStream returns a Java 8 stream that can be consumed only once. You can either convert the stream to a collection first or defer the execution of poller.getEventStream by using a supplier:

Flux.fromStream(
  () -> poller.getEventStream(url, params)
);
1
votes

Solution that worked for me as suggested by @a better oliver

public Flux<ServerEvent> getEventFlux(Long forId){
        ServicePoller poller = new ServicePollerImpl();
        Map<String,Object> params =  new HashMap<>();
        params.put("id", forId);

        Flux<Long> interval = Flux.interval(Duration.ofMillis(pollDuration));
        Flux<ServerEvent> serverEventFlux =
                Flux.fromStream(
                        ()->{
                        return poller.getEventStream(url, params).peek((se)->{reactSink.addtoSink(forId, se);});
                        }
                );
        Flux<ServerEvent> sourceFlux= Flux.zip(interval, serverEventFlux)
                 .map(Tuple2::getT2);

        return sourceFlux;

    }