0
votes

High-level architecture

JMS (Producer/Consumer) <----> Artemis(STOMP) <----> Websocket-Broker-Relay-Service <----> STOMP-over-Websocket-client (Producer/Consumer)

Some observations

  1. In STOMP consumer, with client-individual ack-subscription, whether I NACK or ACK, the message is discarded by Artemis. I want the message to be redelivered to the same or any other consumer. Is there a way to achieve it?

  2. In JMS consumer, the durable-message is not delivered if the consumer was down when the message was received on Artemis. My expectation was that once the consumer service comes back up again, the durable message will be delivered.

class StompSessionHandlerImpl implements StompSessionHandler {
    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        session.setAutoReceipt(Boolean.FALSE);
        StompHeaders headers1 = new StompHeaders();
        headers1.setDestination("/queue/msg");
        headers1.add("durable-subscription-name", messagingUtil.getServiceSubscriptionChannel());
        headers1.add("Authorization", "Bearer ".concat(token));
        headers1.setAck("client-individual");
        session.subscribe(headers1, this);

    }

    @Override
    public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
        session.acknowledge(Objects.requireNonNull(headers.getMessageId()), false);
    }

    @Override
    public void handleTransportError(StompSession session, Throwable exception) {
        synchronized (StompSessionHandlerImpl.msgSenderLock) {
            if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
                initStompSession();
            }
        }
    }

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return COMessage.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        if (payload == null) return;
        COMessage msg = (COMessage) payload;
     try {
        stompMessagingService.handleReceivedMessages(msg);
        self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), true);
       } catch (Exception e) {
           self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), false);
       }

    }


    @PreDestroy
    public void cleanUp() {
        self.stompMessagingService.getStompSession().disconnect();
    }

}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
    @Bean
    public WebSocketStompClient stompClient() {
        WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
        List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
        SockJsClient sockJsClient = new SockJsClient(transports);
        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        converter.setObjectMapper(objectMapper);
        stompClient.setMessageConverter(converter);
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10000);
        scheduler.initialize();
        stompClient.setTaskScheduler(scheduler);
        stompClient.setDefaultHeartbeat(new long[]{20000, 20000});
        stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
        ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
        return stompClient;
    }
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private String host;

    private String password;

    private String user;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/queue", "/topic", "/exchange")
                .setRelayHost(host)
                .setClientLogin(user)
                .setClientPasscode(password)
                .setSystemHeartbeatSendInterval(20000)
                .setSystemLogin(user)
                .setSystemPasscode(password)
                .setUserDestinationBroadcast("/topic/unresolved-user")
                .setUserRegistryBroadcast("/topic/log-user-registry");
        config.setApplicationDestinationPrefixes("/device");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
        registry.setErrorHandler(new StompSubProtocolErrorHandler());
    }

    @Bean
    public DefaultSimpUserRegistry getDefaultSimpRegistry() {
        return new DefaultSimpUserRegistry();
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setMessageSizeLimit(Integer.MAX_VALUE);
        registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
        registry.setTimeToFirstMessage(300000);
        registry.setSendTimeLimit(300000);
        registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
            @Override
            public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
                return new EmaWebSocketHandlerDecorator(webSocketHandler);
            }
        });

    }

}
class ArtemisConfig extends ArtemisAutoConfiguration {

    @Bean("mqConnectionFactory")
    public ConnectionFactory senderActiveMQConnectionFactory() {

        ActiveMQConnectionFactory connectionFactory =
               new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
        connectionFactory.setUser(user);
        connectionFactory.setPassword(password);
        connectionFactory.setConnectionTTL(-1L);
        connectionFactory.setClientID(clientID);
        connectionFactory.setEnableSharedClientID(true);
        connectionFactory.setPreAcknowledge(Boolean.FALSE);
        return connectionFactory;
    }

    @Bean("mqCachingConnectionFactory")
    @Primary
    public ConnectionFactory cachingConnectionFactory() {
        return new CachingConnectionFactory(senderActiveMQConnectionFactory());
    }

    @Bean("jmsTemplate")
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setMessageConverter(jsonMessageConverter);
        jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
        jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
        jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
        return jmsTemplate;
    }

    @PreDestroy
    public void cleanUp() {
        if (connection.isStarted()) {
            try {
                connection.close();
            } catch (JMSException e) {
                log.error("Failed to close the JMS connection {0}", e);
            }
        }
    }

}
1

1 Answers

2
votes

When using ActiveMQ Artemis a STOMP ACK frame tells the broker that the message has been consumed successfully so it should be removed from the queue. A STOMP NACK frame tells the broker that the message was not consumed successfully so the broker will discard it. The exact behavior here is not specified by the STOMP specification. It only says:

NACK is the opposite of ACK. It is used to tell the server that the client did not consume the message. The server can then either send the message to a different client, discard it, or put it in a dead letter queue. The exact behavior is server specific.

NACK takes the same headers as ACK: id (REQUIRED) and transaction (OPTIONAL).

NACK applies either to one single message (if the subscription's ack mode is client-individual) or to all messages sent before and not yet ACK'ed or NACK'ed (if the subscription's ack mode is client).

If you want the message to be redelivered you should neither ack nor nack the message and when the consumer's connection is closed the message will be placed back onto the queue for delivery to another (or the same) client.

In the future I expect this behavior will be configurable.