2
votes

I have read that flatmap transformation is asynchronous, here in the example I am printing the name of thread inside the lambda definition. It is printing the same thread in which the source has been subscribed. As per my understanding it should print a different thread name - other than in which source has been subscribed, as this transformation must be performed in different thread.

Flux.just(1, -2, 3, 4, -5, 6)
    .flatMap(element -> { 
        try { 
            Thread.sleep(1000);
        } 
        catch (InterruptedException e) {
            e.printStackTrace(); 
        } 

        System.out.println(Thread.currentThread().getName() + " element: " + element); 
        return Flux.just(element);
    })
    .subscribe()
3

3 Answers

2
votes

The fact it's asynchronous doesn't necessarily imply that it runs in parallel, which you seem to be expecting here. However, you can convert the Flux into a ParallelFlux and specify a parallel scheduler:

Flux.just(1, -2, 3, 4, -5, 6)
        .parallel()
        .runOn(Schedulers.elastic())
        .flatMap(element
                -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) { // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName()
                    + " element: " + element);

            return Flux.just(element);
        })
        .subscribe();
Thread.currentThread().join(); //Just a hack to keep the program alive.

If, on the other hand you don't want it to run in parallel, but just on a separate thread to your main thread, then no need to convert it to a parallel Flux - just provide a .subscribeOn(Schedulers.elastic()) call or similar instead.

1
votes

No, it won't print a different thread name. Reactive by default runs on a single threads - on which the subscription happens. In this case, the subscription happens on the calling thread, hence, all the elements are emitted on it and processed further on it.

Let us first try to understand how .flatMap works:

  1. For each upstream event, you create a stream out of it.
  2. flatMap subscribes to each inner stream eagerly (that's why they are even triggered). Note that eagerly here means it subscribes to all the inner streams at once, without waiting for other inner streams to complete (unlike concatMap).
  3. Does a dynamic merge of all the inner stream, as they emit events.

Note that each inner stream is after all, a stream. Unless you tell it to use a different thread, each will emit on the same thread (the calling thread, what's happening in your case). You can do a .subscribeOn(Schedulers.elastic) (or schedulers.parallel()) for each one of them, if you want to process them in parallel:

Flux.just(1, -2, 3, 4, -5, 6)
    .flatMap(
        element -> {
          //this will always print the same thread - the calling thread basically
          System.out.println(Thread.currentThread().getName() + " element: " + element);
          return Mono.just(element)
              .subscribeOn(Schedulers.parallel())
              .doOnNext(
                  a ->
                      System.out.println(
                          a + " emitted on thread: " + Thread.currentThread().getName()));
        })
    .subscribe();

.flatMap doesn't care what thread an element comes in, and what thread it goes out - all it will do is subscribe to the inner streams and merge their events as and when they comes. Remember that each of the inner streams is a 'promise'. It will complete ultimately (with a onComplete) signal, but you don't know when. flatMap still doesn't care. concatMap, however, waits for a stream to complete before subscribing to the next. That's how it maintains the order of the original stream.

Read more on this here (my own article on flatMap):

https://medium.com/swlh/understanding-reactors-flatmap-operator-a6a7e62d3e95

0
votes

The use of flatMap doesn't affect the thread on which it executes. You can use subscribeOn to influence the Thread on which the execution will occur:

        Flux.just(1, -2, 3, 4, -5, 6)
                .flatMap(element ->
                {
                    try { Thread.sleep(1000);
                    } catch (InterruptedException e) { // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

                    System.out.println(Thread.currentThread().getName() +
                            " element: " + element);
                    return Flux.just(element);
                })
                .subscribeOn(Schedulers.elastic())
                .subscribe();

Depending on what you want the behaviour to be you can make use of any of the following - Schedulers.elastic(), Schedulers.single(), Schedulers.parallel(), Schedulers.immeadiate()