1
votes

I thought below flux chain will be placed / executed via event loop (like JS). So running the below code will print the blocking for loop first & then flux chain will be executed.

But the entire flux is executed first always before it moves to for loop. [I do have some sleep statements which is blocking. But there are in 2 doOnNext stages]

AtomicInteger atomicInteger = new AtomicInteger(0);

// reactor
Flux.generate(synchronousSink -> {
            if (atomicInteger.incrementAndGet() < 3) {
                synchronousSink.next(atomicInteger.get());
            } else
                synchronousSink.complete();
    })
    .doOnNext(i -> {
        System.out.println(
                "A - Received " + i + " by " + Thread.currentThread().getName()
        );
        sleep(Duration.ofSeconds(1));
    }).doOnNext(i -> {
        System.out.println(
                "B - Received " + i + " : by " + Thread.currentThread().getName()
        );
        sleep(Duration.ofSeconds(1));
    }).subscribe();


for (int i = 0; i < 5; i++) {
    System.out.println("For " + i + " by " + Thread.currentThread().getName());
    sleep(Duration.ofMillis(500));
}

It prints

A - Received 1 by main
B - Received 1 by main
A - Received 2 by main
B - Received 2 by main
For 0 by main
For 1 by main
For 2 by main
For 3 by main
For 4 by main

Could someone please explain this behavior and answer these questions?

  1. Only way to achieve async/non-blocking behavior by using some schedulers when we use reactor?
  2. If I am not using any schedulers and let the code use the current thread for execution, can we expect any better performance difference using WebFlux instead of Spring MVC even for an IO intensive applications?
1

1 Answers

0
votes
  1. Thread blocking is not proper usage of Reactor usage. To make it work in a non-blocking way you should use publishOn/subscribeOn then the output should be:
For 0 by main
A - Received 1 by boundedElastic-3
For 1 by main
For 2 by main
B - Received 1 : by boundedElastic-3
For 3 by main
For 4 by main
A - Received 2 by boundedElastic-3

For more about publishOn vs subscribeOn see: link

  1. For sure - Reactor support non-blocking for HTTP (including Websockets), TCP, and UDP. What's more Reactor as a default works on Netty server, which changes the way how the requests are handled. For example in Tomcat request-response is handled by same thread - what is more this thread is waiting for a response, so it is blocked. In Netty, the one thread can handle sending request and the other can handle receiving a response - threads are not waiting implicit for a response.