I know subscribeOn used to switch executing thread when sequence being subscribe, but I found it's not working with ServerRequest.bodyToMono/Flux
Something like
Flux.just(1,2,3)
.doOnNext(integer -> log.info("test {}",integer))
.subscribeOn(Schedulers.elastic())
.subscribe();
will make execution thread change
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 1
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 2
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 3
But what confused me is
Say I have a Spring WebFlux Router:
@Configuration
public class TestRouter {
@Bean
public RouterFunction<ServerResponse> testRouterFunction(TestService testService) {
return route().path("/test", builder -> builder.nest(accept(MediaType.ALL),
route -> route.PUT("/", req -> {
Mono<String> valueMono = req.bodyToMono(String.class);
return ServerResponse.ok().body(testService.test(valueMono), String.class);
}))).build();
}
}
and a Service:
@Service
@Slf4j
public class TestService {
public Mono<String> test(Mono<String> mono) {
return mono
.doOnSubscribe(subscription -> log.info("on subscribe"))
.subscribeOn(Schedulers.elastic())
.doOnNext(s -> log.info("received {}", s))
.subscribeOn(Schedulers.elastic());
}
}
basic logic is http put request to localhost:port/test will receive what it send to server as plain text
I try to make doOnNext run on other Thread rather than Spring WebFlux's NIO Thread, no matter where I put
subscribeOn
the execution Thread always be NIO Thread:
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService : on subscribe
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService : received test
Thanks to @MichaelBerry @SimonBaslé, Both of you helps me a lot, upvote both of your answers
In short reactor-netty will overriden subscribeOn for http subscription, use a flatMap()
to include a separate subscribeOn()
on a different Mono/Flux
or publishOn()
can do the job I want