9
votes

Just beginning to explore the reactor project and it's abstractions Mono and Flux and would like to understand the basic differences with the java 8 barebones CompletableFuture.

Here is a simple code I have:

public static void main(String[] args) throws Exception {

    Mono.fromCallable(() -> getData())
            .map(s -> s + " World ")
            .subscribe(s -> System.out.println(s));

    CompletableFuture.supplyAsync(() -> getData())
            .thenAccept(System.out::println);

    System.out.println(Thread.currentThread()+" End ");
}

private static String getData() {

    int j=0;

    for(int i=0; i<Integer.MAX_VALUE; i++){
        j = j - i%2;
    }

    System.out.println(Thread.currentThread()+" - "+j);
    return " Hello ";
}

Firstly, no surprises with the CompletableFuture. supplyAsync schedules the function for execution via the ForkJoinPool and the "End" line prints immediately and the program terminates as the main thread is really short-lived here - As expected.

But the Mono.fromCallable(...) blocks the main thread there. Also, the thread name that gets printed in the getData() function is the main thread. So I see a sequential/blocking behavior rather than sequential/nonblocking(async) behavior. Is it because I had applied a subscribe function on the same thread, it is blocking? Can someone explain this, please?

1

1 Answers

5
votes

Is it because I had applied a subscribe function on the same thread, it is blocking?

This is exactly what seems to happen.

This specific behavior surprises me a little since it is not the way most pipelines behave. Most pipelines have one way or another some operation in there which make the pipeline async. publishOn, subscribeOn are the obvious examples but also a flatMap might have such an effect and probably many others. In those cases, subscribe will return immediately.

This hints at a very important point about reactive programming though: Pipelines should not contain long blocking calls. A reactive pipeline is intended to be prepared and when subscribed on to process events without blocking. Blocking statements therefore have the very real potential of blocking the whole execution. With the use of a Scheduler you can confine such calls to special ThreadPools and thereby control their effect.