I have Flux<Foo>
from db (for example 5 elements).
I need to get some info from each Foo, set all of it to Mono<MyRequest>
, send to another rest resource, get a Mono<MyResponse>
and use all info from it in each Foo.
I did it in Flux.flatMap()
with a lot Mono.zipWith()
and Mono.zipWhen()
, but creating MyRequest and sending to resource occur 5 times by 5 threads.
Flux<Foo> flux = dao.getAll();
Flux<Foo> fluxAfterProcessing = flux.flatMap(foo -> monoFilters.map(...));
Mono<Tuple2<MyRequest, MyResponse>> mono =
monoFilters.flatMap(filter -> monoRequest.map(...))
.zipWhen(request -> api.send(request))
.flatMap(tuple -> monoResponseFilters.map(...));
return fluxAfterProcessing.flatMap(foo ->
monoResponseFilters.zipWith(mono).map(...))
How can i process my Mono functions only once by 1 Thread inside Flux?
flatMap
you do actually get at least as many emits as there is in initialFlux
. I assume what you actually wanted to use is collectList(), is it not? – M. Prokhorov