I'm interfacing with an ANT+ usb stick and am replacing my own naive "MessageBus" with project reactor as it seems great fit.
The usb interface is inherently asynchronous (separate input/output pipes) and I want to treat a group of request/response messages in a blocking manner.
I already set up a separate thread which continuously reads messages from the usb in-pipe and writes them to a sink which feeds a shared Flux to which anyone can subscribe. This seems to work fine.
Currently I send a message to the usb pipe, then use .filter() and .blockFirst() on the shared flux: (contrived code)
/**
* Puts a message on the Usb out Pipe and waits for the relevant asynchronous response on the {@link AntUsbReader#antMessages()} {@link Flux}
*
* @param message Message to send.
* @return related response message.
*/
public AntMessage sendBlocking(AntBlockingMessage message) {
send(message); // in essence, calls usbOutPipe.syncSubmit(message.getBytes()), returns void
// bug: ant dongle can reply to message even before following Flux is activated, meaning .blockFirst() goes in timeout.
return this.antUsbMessageReader.antMessages() // .antMessages() is an (infinite) Flux<AntMessage>
.filter(antMessage -> antMessage.getMessageId() == message.getMessageId())
.blockFirst(Duration.ofSeconds(10));
}
Problem is that the usb stick can respond even before the flux is activated, causing a TimeoutException.
Adding a Thread.sleep(10)
to the usb reader "solves" the issue but what would be the correct way to implement this blocking behaviour?
- Setting up a subscription (using .take(1)), sending the message and then blocking on the subscription?
- Set up a Flux in which both the sending and wait for correct response is done?
I couldn't figure this out...
Currently I send a message to the usb pipe, then use .filter() and .blockFirst() on the shared flux
. But in your code yousend(message)
first then youantUsbMessageReader.antMessages()
and what that is i have no idea. Please post your full code and explain step by step what is happening in a more detailed way. – Toerktumlaresend(message)
at the second stem. Then return to flux created on the first step – Igor Artamonov