0
votes

I'm a bit confused on some reactor concepts on what thread model each request falls on. I read https://projectreactor.io/docs/core/release/reference but it's still not clear. Let's see an example:

@Autowired
UserRepository userRepository;

@GetMapping
Flux<User> findAll() {
    log.info("findAll request arrived");
    final Flux<User> defer = Flux.defer(() -> {
          return Flux.fromIterable(userRepository.findAll());
    });
    return defer;
}

log: [boundedElastic-4] - INFO - findAll request arrived

A GET method is executed inside Schedulers.boundedElastic thread pool(for I/O-bound work as per documentation)

@PostMapping
Mono<User> save(@RequestBody User user) {
    log.info("save request arrived");
    final Mono<User> newUser = Mono.fromCallable(() -> {
         final User userMono = userRepository.save(user);
          log.info("user saved!");
          return userMono;
    });
    return newUser.subscribeOn(Schedulers.boundedElastic());
}

log: [reactor-http-nio-6] - INFO - save request arrived

A POST method falls on http-nio thread pool.

@PostMapping("/test")
Mono<User> test() {
    log.info("test");
    return Mono.just(new User());
}

A POST without body also falls on Schedulers.boundedElastic.

@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
    NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
    final ReactorResourceFactory reactorResourceFactory = new ReactorResourceFactory();
    reactorResourceFactory.setLoopResources(LoopResources.create("http-nio", 4, true));
    reactorResourceFactory.setUseGlobalResources(false);
    factory.setResourceFactory(reactorResourceFactory);
    factory.setPort(8080);
    return factory;
}

This is how I can configure the http-nio thread pool.

So, my questions are:

  1. Why POST methods with the body are treated by http-nio thread pool?
  2. This http-nio thread pool is supposed to be a smaller thread pool, so why POST methods with the body(which I think are been considered blocking code) falls on them?
  3. Make sense to return newUser.subscribeOn(Schedulers.boundedElastic()); or it is supposed to stay on same thread?