My usecase is to create an reactive endpoint like this :
public Flux<ServerEvent> getEventFlux(Long forId){
ServicePoller poller = new ServicePollerImpl();
Map<String,Object> params = new HashMap<>();
params.put("id", forId);
Flux<Long> interval = Flux.interval(Duration.ofMillis(pollDuration));
Flux<ServerEvent> serverEventFlux =
Flux.fromStream(
poller.getEventStream(url, params) //poll a given endpoint after a fixed duration.
);
Flux<ServerEvent> sourceFlux= Flux.zip(interval, serverEventFlux)
.map(Tuple2::getT2); // Zip the two streams.
/* Here I want to store data from sourceFlux into a collection whenever some data arrives without disturbing the downstream processing in Spring. So that I can access collection later on without polling again */
This sends back the data to front end as soon as it is available , however my second use case is to pool that data as it arrives into a separate collection , so that if a similar request arrives later on , I can offload the whole data from the pool without hitting the service again .
I tried to subscribe the flux , buffer , cache and collect the flux before returning from the original flux the controller , but all of that seems to close the stream hence Spring cant process it.
What are my options to tap into the flux and store values into a collection as and when they arrive without closing the flux stream ?
Exception encountered :
java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:343) ~[na:1.8.0_171] at java.util.stream.ReferencePipeline.iterator(ReferencePipeline.java:139) ~[na:1.8.0_171] at reactor.core.publisher.FluxStream.subscribe(FluxStream.java:57) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE] at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE] at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:573) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE] at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:326) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
java.util.streamis not related to Spring Flux. You are using Java 8 streams somewhere. Without any code it's difficult to provide help. - a better oliver