0
votes

I am using web reactive in spring web flux. I have implemented a Handler function for POST request. I want the server to return immediately. So, I have implemeted the handler as below -:

public class Sample implements HandlerFunction<ServerResponse>{

public Mono<ServerResponse> handle(ServerRequest request) {

Mono bodyMono = request.bodyToMono(String.class);

bodyMono.map(str -> {
  System.out.println("body got is " + str);
  return str;
}).subscribe();

return ServerResponse.status(HttpStatus.CREATED).build();
    }
}

But the print statement inside the map function is not getting called. It means the body is not getting extracted. If I do not return the response immediately and use

return bodyMono.then(ServerResponse.status(HttpStatus.CREATED).build())

then the map function is getting called.

So, how can I do processing on my request body in the background? Please help.

EDIT

I tried using flux.share() like below -:

Flux<String> bodyFlux = request.bodyToMono(String.class).flux().share();
Flux<String> processFlux = bodyFlux.map(str -> {
      System.out.println("body got is");
      try{
        Thread.sleep(1000);
      }catch (Exception ex){

      }
      return str;
    });

    processFlux.subscribeOn(Schedulers.elastic()).subscribe();

    return bodyFlux.then(ServerResponse.status(HttpStatus.CREATED).build());

In the above code, sometimes the map function is getting called and sometimes not.

2
The code you posted doesn't compile. Post actual compiling code, that reproduces the issue. - JB Nizet
I have edited the question to post whole class. - Arunim Chopra
And the code still doesn't compile. - JB Nizet
now try to compile - Arunim Chopra

2 Answers

0
votes

As you've found, you can't just arbitrarily subscribe() to the Mono returned by bodyToMono(), since in that case the body simply doesn't get passed into the Mono for processing. (You can verify this by putting a single() call in that Mono, it'll throw an exception since no element will be emitted.)

So, how can I do processing on my request body in the background?

If you really still want to just use reactor to do a long task in the background while returning immediately, you can do something like:

return request.bodyToMono(String.class).doOnNext(str -> {
    Mono.just(str).publishOn(Schedulers.elastic()).subscribe(s -> {
        System.out.println("proc start!");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("proc end!");
    });
}).then(ServerResponse.status(HttpStatus.CREATED).build());

This approach immediately publishes the emitted element to a new Mono, set to publish on an elastic scheduler, that is then subscribed in the background. However, it's kind of ugly, and it's not really what reactor is designed to do. You may be misunderstanding the idea behind reactor / reactive programming here:

  • It's not written with the idea of "returning a quick result and then doing stuff in the background" - that's generally the purpose of a work queue, often implemented with something like RabbitMQ or Kafka. It's "raison d'ĂȘtre" is instead to be non-blocking, so a single thread is never idly blocked, waiting for something else to complete.
  • The map() method isn't designed for side effects, it's designed to transform each object into another. For side effects, you want doOnNext() instead;
  • Reactor uses a single thread by default, so your "additional processing" in your map() method would still block that thread.

If your application is for anything more than quick demo purposes, and/or you need to make heavy use of this pattern, then I'd seriously consider setting up a proper work queue instead.

0
votes

This is not possible.

Web servers (including Reactor Netty, Tomcat, etc) clean up and recycle resources when request processing is done. This means that when your controller handler is done, the HTTP resources, the request itself, reusable buffers, etc are recycled or closed. At that point, you cannot read from the request body anymore.

In your case, you need to read and buffer the whole request body first, then return a response and kick off a task for processing that request in a separate execution.