0
votes

I'm interfacing with an ANT+ usb stick and am replacing my own naive "MessageBus" with project reactor as it seems great fit.
The usb interface is inherently asynchronous (separate input/output pipes) and I want to treat a group of request/response messages in a blocking manner.

I already set up a separate thread which continuously reads messages from the usb in-pipe and writes them to a sink which feeds a shared Flux to which anyone can subscribe. This seems to work fine.

Currently I send a message to the usb pipe, then use .filter() and .blockFirst() on the shared flux: (contrived code)

    /**
     * Puts a message on the Usb  out Pipe and waits for the relevant asynchronous response on the {@link AntUsbReader#antMessages()} {@link Flux}
     *
     * @param message Message to send.
     * @return related response message.
     */

    public AntMessage sendBlocking(AntBlockingMessage message) {
        send(message); // in essence, calls usbOutPipe.syncSubmit(message.getBytes()), returns void
        // bug: ant dongle can reply to message even before following Flux is activated, meaning .blockFirst() goes in timeout.
        return this.antUsbMessageReader.antMessages() // .antMessages() is an (infinite)  Flux<AntMessage>
                .filter(antMessage -> antMessage.getMessageId() == message.getMessageId())
                .blockFirst(Duration.ofSeconds(10));
    }

Problem is that the usb stick can respond even before the flux is activated, causing a TimeoutException.
Adding a Thread.sleep(10) to the usb reader "solves" the issue but what would be the correct way to implement this blocking behaviour?

  • Setting up a subscription (using .take(1)), sending the message and then blocking on the subscription?
  • Set up a Flux in which both the sending and wait for correct response is done?

I couldn't figure this out...

2
your question and code is very unclear Currently I send a message to the usb pipe, then use .filter() and .blockFirst() on the shared flux. But in your code you send(message) first then you antUsbMessageReader.antMessages() and what that is i have no idea. Please post your full code and explain step by step what is happening in a more detailed way.Toerktumlare
Thanks for the reaction. I was hoping it was clear from the explanation + contrived code. Posting the full code is not possible at the moment seeing I'm in the middle of an overhaul of the internal message passing logic. For now I'll add comments to the code and I'll see I can't refactor the full code to something I can post, or a code sample to demo the problemG. Vermeylen
Can you just send it after making the flux? Like your working solution, but w/o Runnable mono, just send(message) at the second stem. Then return to flux created on the first stepIgor Artamonov
That was my original try, because this seems the most logical approach: Set up flux to start listening, send message, block and get response message. But a Flux is only active when subscribed (which is what block does), and I can't block before actually sending the message :) . This being said, I do see my project evolve becoming fully reactive so there's no more need for blocking, but for now I'm just migrating the core.G. Vermeylen

2 Answers

0
votes

I arrived at a working solution, but I'm not sure if it's the best:

I set up a Mono for sending the asynchronous message and merge it with a Flux that filters for a matching message. Seeing the Mono never emits values, I know the first object from the merge is the response message from my Flux, so I can cast it to the correct type.

This still feels a bit dirty, but then again, trying to get blocking behaviour with a framework meant for async work will always feel a bit dirty...

    public AntMessage sendBlocking(AntBlockingMessage requestMessage) {
        Flux<AntMessage> response = this.antUsbReader.antMessages()
                .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))
                .take(1);

        Mono<Void> messageSender = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage));
        return (AntMessage) Flux.merge(response, messageSender).blockFirst(Duration.ofSeconds(1));
    }

    private boolean isMatchingResponse(AntBlockingMessage message, AntMessage response) {
        if (message instanceof RequestMessage) {
            return response.getMessageId() == ((RequestMessage) message).getMsgIdRequested();
        }
        return response.getMessageId() == message.getMessageId();
    }
0
votes

After looking at your code i would suggest something in the ballpark. I have written from my mobile so havn't tested it.

But we write first, then block for 1 sec, then we return the fetching of the filtered response.

Flux<AntMessage> response = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage))
    .block(Duration.ofSeconds(1))
    .thenReturn(this.antUsbReader.antMessages()
            .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))
            .take(1));