1
votes

Spring integration offers non-reactive inbound/outbound WebSocket adapters which, simply put, associate session with id via internal container, you do some processing on message, and on outbound, it checks message headers for session id, and sends it via that session.

Now, with Spring offering reactive WebSocket support via org.springframework.web.reactive.socket.WebSocketSession and other classes there, I'm wondering of there is similar support in terms of channel adapters for reactive WebSocket stack.

If not, are there any common patterns/practices, how would one integrate reactive WS with spring-integration message flows?

2

2 Answers

1
votes

This feature hasn't been called yet, so we haven't thought on the matter.

Please, take a look into my SandBox. This is the best I can suggest with the current state of situation.

We just follow the standard Spring WebFlux suggestions to implement a WebSockets solution. So, we have there a WebSocketHandler implementation with an appropriate URL mapping. The implementation just forward a Flux from the session.receive() into the IntegrationFlow registered dynamically. The flow then converted to the Reactive Publisher which is used for the session.send().

I believe there can be used many other approaches, e.g. using a FluxMessageChannel bean and its subscribeTo() from this handle(WebSocketSession) impl to bridge the Flux into predefined Integration flow. Or simple @MessagingGateway call from the doOnNext().

Not sure, though, if session.send() can be used downstream independently (need to play), but you can see in the sample how I propagate a WebSocketSession into the MessageHeaders to give it access in the Integration flow.

0
votes

Thanks for your insights, Artem, they helped a lot.

Here's what I ended up doing:

From handler, I simply send received messages to my channel accepting them:

public class FakeWebSocketHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        session.receive()
            .map(wsm -> MessageBuilder.withPayload(wsm).build())
            .subscribe(subscriberChannel::send);

        return Mono.never();
    }
}

And downstream, at the end of my flow, in my final service activator, I send the response:

public class FakeResponder extends AbstractMessageHandler {
    @Override
    protected void handleMessageInternal(Message<?> message) {
        final WebSocketSession session = ...; // obtain WebSocketSession for this message

        session.send(Mono.just(message.getPayload())
            .map(this::convertToSomeByteRepresentation)
            .map(bb -> session.binaryMessage(dbf -> dbf.wrap(bb)))
        ).subscribe();
    }
}