
I know subscribeOn used to switch executing thread when sequence being subscribe, but I found it's not working with ServerRequest.bodyToMono/Flux

Something like

          .doOnNext(integer -> log.info("test {}",integer))

will make execution thread change

INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 1
INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 2
INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 3

But what confused me is

Say I have a Spring WebFlux Router:

public class TestRouter {
    public RouterFunction<ServerResponse> testRouterFunction(TestService testService) {
        return route().path("/test", builder -> builder.nest(accept(MediaType.ALL),
          route -> route.PUT("/", req -> {
              Mono<String> valueMono = req.bodyToMono(String.class);
              return ServerResponse.ok().body(testService.test(valueMono), String.class);

and a Service:

public class TestService {
    public Mono<String> test(Mono<String> mono) {
        return mono
          .doOnSubscribe(subscription -> log.info("on subscribe"))
          .doOnNext(s -> log.info("received {}", s))

basic logic is http put request to localhost:port/test will receive what it send to server as plain text

I try to make doOnNext run on other Thread rather than Spring WebFlux's NIO Thread, no matter where I put


the execution Thread always be NIO Thread:

INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService     : on subscribe
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService     : received test

Thanks to @MichaelBerry @SimonBaslé, Both of you helps me a lot, upvote both of your answers

In short reactor-netty will overriden subscribeOn for http subscription, use a flatMap() to include a separate subscribeOn() on a different Mono/Flux or publishOn() can do the job I want


This isn't something you can change - it's only the last subscribeOn() call in the chain before subscribe() is called that's honoured, so it's up to WebFlux to use whatever scheduler it wants to. In this case it looks like it's handling the requests in an NIO driven event loop or similar.

However, you can include a flatMap() call in your chain, for which you can specify a separate subscribeOn() which won't be overriden. This may be an option depending on your use case, as you could do the bulk of the work in a publisher defined in the flatMap() call.