High-level architecture
JMS (Producer/Consumer) <----> Artemis(STOMP) <----> Websocket-Broker-Relay-Service <----> STOMP-over-Websocket-client (Producer/Consumer)
Some observations
In STOMP consumer, with client-individual ack-subscription, whether I NACK or ACK, the message is discarded by Artemis. I want the message to be redelivered to the same or any other consumer. Is there a way to achieve it?
In JMS consumer, the durable-message is not delivered if the consumer was down when the message was received on Artemis. My expectation was that once the consumer service comes back up again, the durable message will be delivered.
class StompSessionHandlerImpl implements StompSessionHandler {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
session.setAutoReceipt(Boolean.FALSE);
StompHeaders headers1 = new StompHeaders();
headers1.setDestination("/queue/msg");
headers1.add("durable-subscription-name", messagingUtil.getServiceSubscriptionChannel());
headers1.add("Authorization", "Bearer ".concat(token));
headers1.setAck("client-individual");
session.subscribe(headers1, this);
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
session.acknowledge(Objects.requireNonNull(headers.getMessageId()), false);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
synchronized (StompSessionHandlerImpl.msgSenderLock) {
if (exception instanceof ConnectionLostException && !getStompSession().isConnected()) {
initStompSession();
}
}
}
@Override
public Type getPayloadType(StompHeaders headers) {
return COMessage.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
if (payload == null) return;
COMessage msg = (COMessage) payload;
try {
stompMessagingService.handleReceivedMessages(msg);
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), true);
} catch (Exception e) {
self.stompMessagingService.getStompSession().acknowledge(headers.getMessageId(), false);
}
}
@PreDestroy
public void cleanUp() {
self.stompMessagingService.getStompSession().disconnect();
}
}
class WebSocketConfig extends WebSocketMessagingAutoConfiguration {
@Bean
public WebSocketStompClient stompClient() {
WebSocketClient simpleWebSocketClient = new StandardWebSocketClient();
List<Transport> transports = List.of(new WebSocketTransport(simpleWebSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
converter.setObjectMapper(objectMapper);
stompClient.setMessageConverter(converter);
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10000);
scheduler.initialize();
stompClient.setTaskScheduler(scheduler);
stompClient.setDefaultHeartbeat(new long[]{20000, 20000});
stompClient.setReceiptTimeLimit(Integer.MAX_VALUE);
ContainerProvider.getWebSocketContainer().setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE);
return stompClient;
}
}
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private String host;
private String password;
private String user;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/queue", "/topic", "/exchange")
.setRelayHost(host)
.setClientLogin(user)
.setClientPasscode(password)
.setSystemHeartbeatSendInterval(20000)
.setSystemLogin(user)
.setSystemPasscode(password)
.setUserDestinationBroadcast("/topic/unresolved-user")
.setUserRegistryBroadcast("/topic/log-user-registry");
config.setApplicationDestinationPrefixes("/device");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS().setWebSocketEnabled(Boolean.TRUE);
registry.setErrorHandler(new StompSubProtocolErrorHandler());
}
@Bean
public DefaultSimpUserRegistry getDefaultSimpRegistry() {
return new DefaultSimpUserRegistry();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(Integer.MAX_VALUE);
registry.setSendBufferSizeLimit(Integer.MAX_VALUE);
registry.setTimeToFirstMessage(300000);
registry.setSendTimeLimit(300000);
registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
@Override
public WebSocketHandler decorate(WebSocketHandler webSocketHandler) {
return new EmaWebSocketHandlerDecorator(webSocketHandler);
}
});
}
}
class ArtemisConfig extends ArtemisAutoConfiguration {
@Bean("mqConnectionFactory")
public ConnectionFactory senderActiveMQConnectionFactory() {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://".concat(host.concat(":").concat(port)));
connectionFactory.setUser(user);
connectionFactory.setPassword(password);
connectionFactory.setConnectionTTL(-1L);
connectionFactory.setClientID(clientID);
connectionFactory.setEnableSharedClientID(true);
connectionFactory.setPreAcknowledge(Boolean.FALSE);
return connectionFactory;
}
@Bean("mqCachingConnectionFactory")
@Primary
public ConnectionFactory cachingConnectionFactory() {
return new CachingConnectionFactory(senderActiveMQConnectionFactory());
}
@Bean("jmsTemplate")
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setMessageConverter(jsonMessageConverter);
jmsTemplate.setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
jmsTemplate.setMessageIdEnabled(Boolean.TRUE);
jmsTemplate.setTimeToLive(Integer.MAX_VALUE); // TODO : review
return jmsTemplate;
}
@PreDestroy
public void cleanUp() {
if (connection.isStarted()) {
try {
connection.close();
} catch (JMSException e) {
log.error("Failed to close the JMS connection {0}", e);
}
}
}
}