0
votes

I am building an application using Spring Websockets on a clustered tomcat environment with a RabbitMQ broker. I have an API module which needs to register the endpoint to listen to. I followed the normal examples and came up with this config:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer
{
    @Override
    public void configureMessageBroker(final MessageBrokerRegistry config)
    {
        config.enableStompBrokerRelay("/topic/")
            .setRelayHost("localhost")
            .setRelayPort(61613)
            .setClientLogin("guest")
            .setClientPasscode("guest");
    }

    @Override
    public void registerStompEndpoints(final StompEndpointRegistry registry)
    {
        registry.addEndpoint("/updates")
            .setAllowedOrigins("*")
            .withSockJS();
    }
}

While this works, it doesn't solve my issue as it appears the WebSocket and relay config are all bundled into the API module therefore leaving other layers unable to reuse the broker. I need the stomp message broker relay configuration to happen at the service layer so that other modules of our app can push messages to topics in RabbitMQ which then turn around and notify the API module to update all open websockets.

Below is a sample diagram of the relevant layers in our application and what I am trying to accomplish. I need to allow the module "Cron Message Sender" to push messages to everyone who is subscribed to a message topic through our other API modules.

Sample Application Layers

2
The other possible idea I am exploring right now is to break up the two into completely separate components. By that I mean that each API module would run it's own independent websocket configuration using a simple broker (no relay). Then, at the service layer, I would connect each service instance together by directly talking to rabbitmq via AMQP. Then, each API module would listen for incoming AMQP messages and forward them to their respective users over Stomp WS. I am still new to messaging and not sure if this would be better or worse in terms of performance in a production environment.Greg Marut
Assuming this approach works, this would allow me to decouple the components allowing for greater flexibility if I decide to move away from RabbitMQ in favor of some other message broker (like AWS SQS).Greg Marut

2 Answers

1
votes

So the second approach did in fact work. I configured the websockets to be run independently (no relay) and then I made a separate AMQP message broker connection at the service layer to allow communication between services. In the API module, I simply listened to the AMQP message broker and then manually forwarded those messages to the SimpMessagingTemplate which notified the websocket subscribers. I am not sure if this is technically the "right" way to do it but it seems to be working great and I do not yet see any issues with the implementation. In fact, I actually think I may prefer this approach as I now just gave all my services the ability to talk to each other with more types of messages than what I originally needed for the websockets.

Here is the new configuration:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer
{
    @Override
    public void configureMessageBroker(final MessageBrokerRegistry config)
    {
        config.enableSimpleBroker("/topic");
    }

    @Override
    public void registerStompEndpoints(final StompEndpointRegistry registry)
    {
        registry.addEndpoint("/updates")
            .setAllowedOrigins("*")
            .withSockJS();
    }
}

And here is where I listen to the message broker and forward the messages to the websocket subscribers:

@Component
public class SendWebSocketUpdates
{
    private static final Logger logger = LoggerFactory.getLogger(SendWebSocketUpdates.class);

    private final Gson gson;

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private MessageBrokerConsumer<String> messageBrokerConsumer;

    public SendWebSocketUpdates()
    {
        this.gson = new Gson();
    }

    @PostConstruct
    public void init()
    {
        //listen for incoming AMQP messages from the rabbitmq server and forward them to the websocket subscribers
        messageBrokerConsumer.addListener((message, topicName) -> {
            final String destination = "/topic/" + topicName;
            final String messageJson = gson.toJson(message.getBody());

            //check to see if trace logging is enabled
            if (logger.isTraceEnabled())
            {
                logger.trace("Sending Message to \"{}\": {}", destination, messageJson);
            }

            //broadcast the via a STOMP message to subscribers of this topic
            messagingTemplate.convertAndSend(destination, messageJson);
        });
    }
}
-1
votes

It's easy to solve this problem. I waste a whole day to find the solution. Here 's my answer for the same problem.

The key is setUserDestinationBroadcast and setUserRegistryBroadcast:

registry.enableStompBrokerRelay("/topic/", "/queue/", "/exchange/") 
        .setUserDestinationBroadcast("/topic/log-unresolved-user") 
        .setUserRegistryBroadcast("/topic/log-user-registry")