0
votes

I am trying to run a basic example with Project Reactor and its Flux class. The source should create integers from 1 to 10 and just print out the emitted integers.

All examples are executed in the main method of the application with no other code running.

It is fairly easy to get the basics running:

Flux.range(1, 10).subscribe(System.out::println);

The next step is to emit the integers in another thread. This can be achieved by

        Flux.range(1, 10)
                .publishOn(Schedulers.newSingle("OtherThread"))
                .subscribe(System.out::println);

As the project reference states, Schedulers.newSingle("OtherThread") creates "a per-call dedicated thread" (see Project Reactor Reference). The reference explains that there is also a Schedulers.single() which gives access to the execution context of "a single, reusable thread" and "reuses the same thread for all callers".

As I am using threading at just one point in this example publishOn(...) my understanding is, that both methods (newSingle(...) and single()) could be used interchangeably.

        Flux.range(1, 10)
                .publishOn(Schedulers.single())
                .subscribe(System.out::println);

But the last example does not print out anything. And to be honest, after several hours of searching and playing around I am not getting why.

I found this blog article Flight of the Flux 3 - Hopping Threads and Schedulers which states single() as "for one-off tasks that can be run on a unique ExecutorService". But it does not bring light into the darkness.

As often I expect there is a simple answer to the question Why do newSingle(...) and single() behave differently in this basic exmaple? which will make me feel stupid. But I would be more than happy if it just finally solves my confusion.

An interesting site note is, that by introducing log() the example prints like a charm

        Flux.range(1, 10)
                .log()
                .publishOn(Schedulers.single())
                .subscribe(System.out::println);

UPDATE: According to the answer of Martin Tarjányi I created a gist which demonstrates the different behavior with a small code snippet and explaining text.

1
How are you checking that it doesn't print anything out, i.e how are you running it? I have just c+p the code into an @Test and it runs fine, prints 1 to 10123

1 Answers

0
votes

When you create a new single scheduler using newSingle(String) by default it creates a new non-daemon thread, which means it will block the application from exiting until its thread pool is not shut down.

However, if you use the built-in single() it will use a daemon thread which will not prevent the application from exiting even if its work is not done yet. That's exactly what you see in your example: the main thread finished it work by assembling the reactive pipeline and the VM exits regardless of the state of the daemon single thread.

To have the same behavior in both cases you can replace you subscribe with a doOnNext() and a blockLast():

Flux.range(1, 10)
    .publishOn(Schedulers.single())
    .doOnNext(System.out::println)
    .blockLast();

Usually using block is strongly discouraged in reactive programming. However, if your main thread has nothing else to do it is fine calling block() on your reactive chain.