1
votes

I have an interesting problem which I don't know how to solve without calling a block() method.

  1. my method receives a user as an argument.

  2. it calls an external service and receives a Mono

  3. if Mono does not contain an error and user.getDepartment().startsWith("Development") I want to add this to the Flux

  4. Flux should be a result of my method

For now I unfortunatelly should initialize Flux from the list before calling the block()-method for Mono that I receive:

  Flux<User> getUsers(User user) {

     List<Users> developmentUsers = new ArrayList<User>();

     while (user.containsManager()) {

         val resultUser = externalUserServiceClient.getManager(user).block(); //externalUserServiceClient.getManager(user) should return a Mono<User>
         if (resultUser.getDepartment().startsWith("Development"))
             developemtnUsers.add(resultUser);
         user = resultUser;
     }

     return Flux.fromIterable(developmentUsers);
  }

I am sure there should be a way not to interrupt the async processes chain. Do you know how?

2

2 Answers

2
votes

You can use Mono#expand that recursively applies getManager function and combines results into Flux<User>:

Flux<User> getUsers(User user) {
  return getManager(user).expand(manager -> getManager(manager));
}

Mono<User> getManager(User user) {
  if (user.containsManager()) {
    return externalUserServiceClient.getManager(user)
      .filter(manager -> manager.getDepartment().startsWith("Development"));
  } else {
    return Mono.empty();
  }
}
0
votes

By using

externalUserServiceClient.getManager(user).map(manager -> ...);

//or

externalUserServiceClient.getManager(user).flatMap(manager -> ...);