I have below reactive code using flux in reactor core:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> redisHashReactiveCommands.hmset(key, map))
//.flatMap(... //want to store same data async into kafka with its own back pressure handling)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
.doOnComplete(() -> log.debug("On completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
As you can see, I have back pressure handling on this for external source to my process (FluxSink.OverflowStrategy.LATEST). However, I also want to configure back pressure for my process to redis (redisHashReactiveCommands.hmset(key, map)) since it can be a bigger bottleneck than external source to my process. I expect I'd need to create another flux for redis part and link it with this flux, but how do I achieve this since .flatMap works on individual item and not a stream of items?
Also, I want to store the same emitted item into Kafka as well but chaining flapMap's doesn't seem to work.. is there an easy way to link all these together in one set of functional calls (external source -> my process, my process -> redis, my process -> kafka)?