0
votes

I am trying to learn Reactor but I am having a lot of trouble with it. I wanted to do a very simple proof of concept where I simulate calling a slow down stream service 1 or more times. If you use reactor and stream the response the caller doesn't have to wait for all the results.

So I created a very simple controller but it is not behaving like I expect. When the delay is "inside" my flatMap (inside the method I call) the response is not returned until everything is complete. But when I add a delay after the flatMap the data is streamed.

Why does this code result in a stream of JSON

@GetMapping(value = "/test", produces = { MediaType.APPLICATION_STREAM_JSON_VALUE })
Flux<HashMap<String, Object>> customerCards(@PathVariable String customerId) {
    Integer count = service.getCount(customerId);

    return Flux.range(1, count).
            flatMap(k -> service.doRestCall(k)).delayElements(Duration.ofMillis(5000));

}

But this does not

@GetMapping(value = "/test2", produces = { MediaType.APPLICATION_STREAM_JSON_VALUE })
Flux<HashMap<String, Object>> customerCards(@PathVariable String customerId) {
    Integer count = service.getCount(customerId);

    return Flux.range(1, count).
            flatMap(k -> service.doRestCallWithDelay(k));

}

It think I am missing something very basic of the reactor API. On that note. can anyone point to a good book or tutorial on reactor? I can't seem to find anything good to learn this.

Thanks

2

2 Answers

1
votes

The code inside the flatMap runs on the main thread (that is the thread the controller runs). As a result the whole process is blocked and the method doesnt return immediately. Have in mind that Reactor doesnt impose a particular threading model.

On the contrary, according to the documentation, in the delayElements method signals are delayed and continue on the parallel default Scheduler. That means that the main thread is not blocked and returns immediately.

Here are two corresponding examples:

Blokcing code:

Flux.range(1, 500)
    .map(i -> {
            //blocking code
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " - Item : " + i);
            return i;
    })
   .subscribe();
    
    System.out.println("main completed");

Result:

main - Item : 1
main - Item : 2
main - Item : 3
...
main - Item : 500
main completed

Non-blocking code:

Flux.range(1, 500)
    .delayElements(Duration.ofSeconds(1))
    .subscribe(i -> {
        System.out.println(Thread.currentThread().getName() + " - Item : " + i);
    });
    
System.out.println("main Completed");
    
//sleep main thread in order to be able to print the println of the flux
try {
    Thread.sleep(30000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

Result:

main Completed
parallel-1 - Item : 1
parallel-2 - Item : 2
parallel-3 - Item : 3
parallel-4 - Item : 4
...
0
votes

Here is the project reactor reference guide "delayElements" method only delay flux element by a given duration, see javadoc for more details I think you should post details about methods "service.doRestCallWithDelay(k)" and "service.doRestCall(k)" if you need more help.