Is it possible with Project Reactor to wait in a mono for an event / condition without needing to use a blocking thread per mono? With a CompletableFuture
I can pull such a thing off but I can't see how to do it with Project Reactor.
My problem is that I need to correlate requests with responses. The response time varies wildly and some will even never get a reply and timeout. When on the client side a blocking thread per request isn't a problem but since this is a server application I don't want to end up with spawning a thread per request that blocks waiting for a response.
The API looks something like this:
Mono<Response> doRequest(Mono<Request> request);
Since I don't know how to do it with Reactor I will explain how to do it with a CompletableFuture
to clarify what I'm looking for. The API would look like this:
CompletableFuture<Response> doRequest(Request request);
When invoked by a caller a request to a server is made which has a correlation ID in it generated by this method. The caller is returned a CompletableFuture
and the method stores a reference to this CompletableFuture
in map with the correlation ID as key.
There is also a thread (pool) which receives all the responses for the server. When it receives a response it takes the correlation ID from the response and uses it to look up the original request (ie. the CompletableFuture
) in the map and calls complete(response);
on it.
In this implementation you don't need a blocking thread per request. This is basically more of a Vert.X / Netty way of thinking? I would like to know how to implement such a thing (if possible) with Project Reactor.
EDIT 25-07-2019:
As per request in the comments to clarify what I'm getting at below is an example of how I would implement this with CompleteableFuture
's.
I also noticed I made a mistake which might have been rather confusing: In the CompletableFuture
example I passed a Mono
as argument. That should have been just a "normal" argument. My apologies and I hope I didn't confuse people too much with it.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
class NonBlockingCorrelatingExample {
/**
* This example shows how to implement correlating requests with responses without needing a (sleeping)
* thread per request to wait for the response with the use of {@link CompletableFuture}'s.
*
* So the main feat of this example is that there is always a fixed (small) number of threads used even if one
* would fire a thousands requests.
*/
public static void main(String[] args) throws Exception {
RequestResponseService requestResponseService = new RequestResponseService();
Request request = new Request();
request.correlationId = 1;
request.question = "Do you speak Spanish?";
CompletableFuture<Response> responseFuture = requestResponseService.doRequest(request);
responseFuture.whenComplete((response, throwable) -> System.out.println(response.answer));
// The blocking call here is just so the application doesn't exit until the demo is completed.
responseFuture.get();
}
static class RequestResponseService {
/** The key in this map is the correlation ID. */
private final ConcurrentHashMap<Long, CompletableFuture<Response>> responses = new ConcurrentHashMap<>();
CompletableFuture<Response> doRequest(Request request) {
Response response = new Response();
response.correlationId = request.correlationId;
CompletableFuture<Response> reponseFuture = new CompletableFuture<>();
responses.put(response.correlationId, reponseFuture);
doNonBlockingFireAndForgetRequest(request);
return reponseFuture;
}
private void doNonBlockingFireAndForgetRequest(Request request) {
// In my case this is where the request would be published on an MQTT broker (message bus) in a request topic.
// Right now we will just make a call which will simulate a response message coming in after a while.
simulateResponses();
}
private void processResponse(Response response) {
// There would usually be a (small) thread pool which is subscribed to the message bus which receives messages
// in a response topic and calls this method to handle those messages.
CompletableFuture<Response> responseFuture = responses.get(response.correlationId);
responseFuture.complete(response);
}
void simulateResponses() {
// This is just to make the example work. Not part of the example.
new Thread(() -> {
try {
// Simulate a delay.
Thread.sleep(10_000);
Response response = new Response();
response.correlationId = 1;
response.answer = "Si!";
processResponse(response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
static class Request {
long correlationId;
String question;
}
static class Response {
long correlationId;
String answer;
}
}
Mono<Request>
part of your API?Mono
vsCompletableFuture
usually imply laziness onMono
's part: nothing should happen before theMono
is actually subscribed/requested. How does that play in this API? – Simon BasléCompletableFuture
. I suspect you might also have been thrown off by a mistake I made with describing theCompletableFuture
API; It shouldn't have taken aMono
as argument. That mistake might have implied I'm trying to combine the two but thats not the case. Sorry about that. – Jasper Siepkes