I have a pull based stream data source (just like Kafka). And I'd like to apply the reactor on this event processing application.
Currently, I created an infinite sequence of events using EmitterProcessor. And it is subscribed at initial time once and never canceled.
The following code indicates what I've done.
public void initialize(){
EmitterProcessor<Event> emitter = ...
emitter.flatmap(this::step1)
.flatmap(this::step2)
.flatmap(this::finalStep)
//.subscriberContext(...)
.subscribe()
}
For each event in the initial Flux<Event>, I need to maintain/update a context so that I can get all inputs and results for each step and do some reporting in the final step.
Passing the immutable Context class from step to step is an option but this will cause all step() have an additional parameter. And not all step() will use the Context. In that case it seems ugly you just pass Context and return Pair<Context,OtherResult>. The Pair is ugly as well.
So I prefer something like ThreadLocal<Context>. Obviously in reactor the replacement is subscriberContext(). However according my code, the initialize() will be invoked once. The Flux<Event> will be subscribe() once. The subscriberContext is not at my Event level but subscription level. So there will be only a single context in my code. It not works.
The question is should I regard the event stream a Flux<Event> or multiple Mono<Event> and make subscription on each event? If Mono<Event> is the best practice, then I can directly use subscriberContext(). But is there any assemble time overhead (assemble on every event coming)?
In reactor-kafka, it makes each batch of Record a Flux<Record>, how can it implements something like record level context?
Thanks.