3
votes

I have Flux<Foo> from db (for example 5 elements). I need to get some info from each Foo, set all of it to Mono<MyRequest>, send to another rest resource, get a Mono<MyResponse> and use all info from it in each Foo.

I did it in Flux.flatMap() with a lot Mono.zipWith() and Mono.zipWhen(), but creating MyRequest and sending to resource occur 5 times by 5 threads.

Flux<Foo> flux = dao.getAll();
Flux<Foo> fluxAfterProcessing = flux.flatMap(foo -> monoFilters.map(...));
Mono<Tuple2<MyRequest, MyResponse>> mono = 
                      monoFilters.flatMap(filter -> monoRequest.map(...))
                                 .zipWhen(request -> api.send(request))
                                 .flatMap(tuple -> monoResponseFilters.map(...));
return fluxAfterProcessing.flatMap(foo -> 
                       monoResponseFilters.zipWith(mono).map(...))

How can i process my Mono functions only once by 1 Thread inside Flux?

1
By applying flatMap you do actually get at least as many emits as there is in initial Flux. I assume what you actually wanted to use is collectList(), is it not?M. Prokhorov

1 Answers

1
votes

Let's assume that what this task is reads like this:

  • Get some values from database
  • When all values arrive, wrap them in request and send out
  • Zip result with response

Then this leads us to something like this:

Flux<Foo> foos = dao.getAll();
Mono<List<Foo>> everything = foos.collectList();

Mono<MyRequest> request = everything
    // collect the data into another Mono, then into request
    .map(list -> list.stream().map(Foo::getData).collect(toList()))
    .map(data -> new MyRequest(data));

return request.zipWhen(request -> api.send(request));

Alternatively, you can collect build request a little easier if you map the initial foos:

Flux<Data> data = dao.getAll().map(Foo::getData);
Mono<MyRequest> request = data.collectList().map(MyRequest::new);