I've been reading throughout the Reactor documentation, but I was not being able to find proper pattern for the following problem. I have a method that is supposed to do something asynchronously. I returns the result responses in form of a Flux and the consumer could subscribe to it.
The method has following definition:
Flux<ResultMessage> sendRequest(RequestMessage message);
The returning flux is a hot flux, results can come at any given time asynchronously.
The potential consumer could use it in following manner:
sendRequest(message).subscribe(response->doSomethinWithResponse(response);
An implementation can be like this:
Flux<ResultMessage> sendRequest(RequestMessage message) {
Flux<ResultMessage> result = incomingMessageStream
.filter( resultMessage -> Objects.equals( resultMessage.getId(), message.getId() ) )
.take( 2 );
// The message sending is done here...
return result;
}
Where the incomingMessageStream
is a Flux
of all messages going through this channel.
Problem with this implementation is that consumer is subscribed after the result messages are coming, and it can miss some of them.
So, what I am looking for is a solution that will allow consumer not to depend on time of subscription. A potential consumer may not be required to subscribe to resulting Flux
at all. I am looking for a general solution, but if it is not possible you can assume that number of resulting messages is not greater than 2.