3
votes

Is it possible with Project Reactor to wait in a mono for an event / condition without needing to use a blocking thread per mono? With a CompletableFuture I can pull such a thing off but I can't see how to do it with Project Reactor.

My problem is that I need to correlate requests with responses. The response time varies wildly and some will even never get a reply and timeout. When on the client side a blocking thread per request isn't a problem but since this is a server application I don't want to end up with spawning a thread per request that blocks waiting for a response.

The API looks something like this:

Mono<Response> doRequest(Mono<Request> request);

Since I don't know how to do it with Reactor I will explain how to do it with a CompletableFuture to clarify what I'm looking for. The API would look like this:

CompletableFuture<Response> doRequest(Request request);

When invoked by a caller a request to a server is made which has a correlation ID in it generated by this method. The caller is returned a CompletableFuture and the method stores a reference to this CompletableFuture in map with the correlation ID as key.

There is also a thread (pool) which receives all the responses for the server. When it receives a response it takes the correlation ID from the response and uses it to look up the original request (ie. the CompletableFuture) in the map and calls complete(response); on it.

In this implementation you don't need a blocking thread per request. This is basically more of a Vert.X / Netty way of thinking? I would like to know how to implement such a thing (if possible) with Project Reactor.

EDIT 25-07-2019:

As per request in the comments to clarify what I'm getting at below is an example of how I would implement this with CompleteableFuture's.

I also noticed I made a mistake which might have been rather confusing: In the CompletableFuture example I passed a Mono as argument. That should have been just a "normal" argument. My apologies and I hope I didn't confuse people too much with it.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

class NonBlockingCorrelatingExample {

    /**
     * This example shows how to implement correlating requests with responses without needing a (sleeping)
     * thread per request to wait for the response with the use of {@link CompletableFuture}'s.
     *
     * So the main feat of this example is that there is always a fixed (small) number of threads used even if one
     * would fire a thousands requests.
     */
    public static void main(String[] args) throws Exception {

        RequestResponseService requestResponseService = new RequestResponseService();

        Request request = new Request();
        request.correlationId = 1;
        request.question = "Do you speak Spanish?";

        CompletableFuture<Response> responseFuture = requestResponseService.doRequest(request);
        responseFuture.whenComplete((response, throwable) -> System.out.println(response.answer));

        // The blocking call here is just so the application doesn't exit until the demo is completed.
        responseFuture.get();
    }

    static class RequestResponseService {

        /** The key in this map is the correlation ID. */
        private final ConcurrentHashMap<Long, CompletableFuture<Response>> responses =  new ConcurrentHashMap<>();

        CompletableFuture<Response> doRequest(Request request) {
            Response response = new Response();
            response.correlationId = request.correlationId;
            CompletableFuture<Response> reponseFuture = new CompletableFuture<>();
            responses.put(response.correlationId, reponseFuture);

            doNonBlockingFireAndForgetRequest(request);

            return reponseFuture;
        }

        private void doNonBlockingFireAndForgetRequest(Request request) {
            // In my case this is where the request would be published on an MQTT broker (message bus) in a request topic.
            // Right now we will just make a call which will simulate a response message coming in after a while.
            simulateResponses();
        }

        private void processResponse(Response response) {
            // There would usually be a (small) thread pool which is subscribed to the message bus which receives messages
            // in a response topic and calls this method to handle those messages.
            CompletableFuture<Response> responseFuture = responses.get(response.correlationId);
            responseFuture.complete(response);
        }

        void simulateResponses() {
            // This is just to make the example work. Not part of the example.
            new Thread(() -> {
                try {
                    // Simulate a delay.
                    Thread.sleep(10_000);

                    Response response = new Response();
                    response.correlationId = 1;
                    response.answer = "Si!";

                    processResponse(response);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    static class Request {
        long correlationId;
        String question;
    }

    static class Response {
        long correlationId;
        String answer;
    }

}
1
Could you share a dummy example code with CompletableFuture that shows how it works right now?Martin Tarjányi
While this is an interesting question (+1), I wonder if this falls under the guise of premature optimisation - have you tested / benchmarked whether all these blocking threads sitting there are actually an issue in practice? I suspect they won't be and the worst that happens is they may sit there using some memory, but I've never tried...Michael Berry
@MichaelBerry I agree that premature optimization is something to be weary of. I haven't done any benchmarking but the reason why frameworks like Netty and Vert.x scale well is because they work in an event driven fashion instead of having thousands of sleeping threads. If we were to conclude a thread per (for example) connection isn't a problem then I could make life even simpler and just use a single thread for each new connection and simply make a bunch of blocking calls in sequence in that thread. That would be way less complex then using Reactor.Jasper Siepkes
can you expand on the Mono<Request> part of your API? Mono vs CompletableFuture usually imply laziness on Mono's part: nothing should happen before the Mono is actually subscribed/requested. How does that play in this API?Simon Baslé
@SimonBaslé @Yossarian I've added a working (single class) example of how I would implement it with a CompletableFuture. I suspect you might also have been thrown off by a mistake I made with describing the CompletableFuture API; It shouldn't have taken a Mono as argument. That mistake might have implied I'm trying to combine the two but thats not the case. Sorry about that.Jasper Siepkes

1 Answers

2
votes

Yes, it is possible. You can use reactor.core.publisher.Mono#create method to achieve it

For your example:

public static void main(String[] args) throws Exception {
    RequestResponseService requestResponseService = new RequestResponseService();

    Request request = new Request();
    request.correlationId = 1;
    request.question = "Do you speak Spanish?";


    Mono<Request> requestMono = Mono.just(request)
            .doOnNext(rq -> System.out.println(rq.question));
    requestResponseService.doRequest(requestMono)
            .doOnNext(response -> System.out.println(response.answer))
            // The blocking call here is just so the application doesn't exit until the demo is completed.
            .block();
}

static class RequestResponseService {
    private final ConcurrentHashMap<Long, Consumer<Response>> responses =
            new ConcurrentHashMap<>();

    Mono<Response> doRequest(Mono<Request> request) {
        return request.flatMap(rq -> doNonBlockingFireAndForgetRequest(rq)
                .then(Mono.create(sink -> responses.put(rq.correlationId, sink::success))));
    }

    private Mono<Void> doNonBlockingFireAndForgetRequest(Request request) {
        return Mono.fromRunnable(this::simulateResponses);
    }

    private void processResponse(Response response) {
        responses.get(response.correlationId).accept(response);
    }

    void simulateResponses() {
        // This is just to make the example work. Not part of the example.
        new Thread(() -> {
            try {
                // Simulate a delay.
                Thread.sleep(10_000);

                Response response = new Response();
                response.correlationId = 1;
                response.answer = "Si!";

                processResponse(response);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}