3
votes

I'm new with webflux and I'm trying to execute multiple monos with Flux. But i think i'm doing it wrong.. is this the best approach to execute multiple Mono and collect it to list?

Here is my code:

    mainService.getAllBranch()
            .flatMapMany(branchesList -> {
                List<Branch> branchesList2 = (List<Branch>) branchesList.getData();
                List<Mono<Transaction>> trxMonoList= new ArrayList<>();

                branchesList2.stream().forEach(branch -> {
                    trxMonoList.add(mainService.getAllTrxByBranchId(branch.branchId));
                });
                return Flux.concat(trxMonoList); // <--- is there any other way than using concat?
            })
            .collectList()
            .flatMap(resultList -> combineAllList());
    interface MainService{
            Mono<RespBody> getAllBranch();
            Mono<RespBody> getAllTrxByBranchId(String branchId); //will return executed url ex: http://trx.com/{branchId}
    }

so far my with above code i can explain it like this:

  1. Get all branches
  2. iterate through all branchesList2 and add it to trxMonoList
  3. return Flux.concat, this is where i'm not sure is this the right way or not. but it's working
  4. combine all list

I'm just confuse is this the proper way to use Flux in my context? or is there any better way to achieve with what i'm trying to do?

2

2 Answers

2
votes

You need to refactor a little bit your code to reactive.

 mainService.getAllBranch()
        .flatMapMany(branchesList -> Flux.fromIterable(branchesList.getData())) (1)
        .flatMap(branch -> mainService.getAllTrxByBranchId(branch.branchId))    (2)
        .collectList()
        .flatMap(resultList -> combineAllList());

1) Create Flux of branches from List;

2) Iterate through the each element and call a service.

You shouldn't use Stream API in Reactor because it has the same methods but with adaptaptions and optimisations for multythreading.

2
votes

The real problem here is that you shouldn't be hitting a Mono multiple times within a Flux. That will give you problems. If you are designing the API you should fix that to do what you want in a correct reactive manner.

interface MainService{
        Flux<Branch> getAllBranch();
        Flux<Transaction> getAllTrxByBranchId(Flux<String> branchIds);
}

Then your code becomes simpler and the reactive framework will work properly.

mainService.getAllTrxByBranchId(mainService.getAllBranch().map(Branch::getId));