1
votes

I have an application that receives events (using Spring Application Listeners) and I want to create a Flux of some historical events (that might be read from persistence) and an infinite stream of incoming events. The goal is to provide a public method that returns a Flux of all events that have ever been observed, i.e., the concatenation of the initial historical events, all events observed and all future events. The following service is supposed to provide the API in terms of the method stream().

@Service
public class EventService implements ApplicationListener<Event>{

 private Flux<Integer> eventStream = getHistoricalEvents();

 private Flux<Integer> getHistoricalEvents() {
    return Flux.just(1,2,3,4,5);
 }

 @Override
 public void onApplicationEvent(Event event) {
    eventStream = eventStream.concatWithValues(event.hashCode());
 }

 public Flux<Integer> stream(){
    return eventStream;
 }
}

This works to achieve the first two goals, i.e., clients receive the initial set of historical events and all events that have been observed so far.

How can I add future events? Ideally, I am looking for a solution that achieves all that idiomatically as I am new to reactive programming.

1
What do you mean by "expose to clients"? I'd suggest you rather look into something like Kafka which pretty much does what you're looking for. - daniu
I just mean provide a public method like in the example. Kafka is not available for certain reasons. I have rephrased this part. Thanks for the hint. - Hyggenbodden
Why would this not work for future events? Or am I missing something. Every time an event is received via the listener's lifecycle method, its hashcode is taken and added to the stream, wether that happens now or later... - alainlompo

1 Answers

0
votes

You've already worked out how to concat with existing persisted, historical events. As for all future events, it sounds very much like you need Flux.cache():

Turn this Flux into a hot source and cache last emitted signals for further Subscriber. Will retain an unbounded volume of onNext signals. Completion and Error will also be replayed.