We are develop a micro-service system that use ActiveMQ Artemis as the communication method between service. Since the requirement ask to be able to stop the listeners at runtime, we can not use @JmsListener
provide by spring-artemis. After digging the internet and finding out that spring use MessageListenerContainer
behind the scence, we come up with the idea of maintain a list of MessageListenerContainer
our self.
@Bean(name = "commandJmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory commandJmsListenerContainerFactory(
DefaultJmsListenerContainerFactoryConfigurer configurer) {
var factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(false);
return factory;
}
// Use
private Map<String, DefaultMessageListenerContainer> commandQueue;
public void subscribeToCommandQueue(String queueName, CommandListener<?> command) {
commandQueue.computeIfAbsent(queueName, key -> {
var endPoint = new SimpleJmsListenerEndpoint();
endPoint.setDestination(queueName);
endPoint.setMessageListener(message -> {
try {
var body = message.getBody(String.class);
command.execute(commandMessageConverter.deserialize(body));
} catch (JMSException e) {
throw new RuntimeException("Error while process message for queue: " + queueName, e);
}
});
var container = commandJmsListenerContainerFactory.createListenerContainer(endPoint);
// https://stackguides.com/questions/44555106/defaultmessagelistenercontainer-not-reading-messages-from-ibm-mq
// for Every object of Spring classes that implement InitializingBean created manually, we need to call afterPropertiesSet to make the object "work"
container.afterPropertiesSet();
container.start();
return container;
});
}
public void start() {
commandQueue = new ConcurrentHashMap<>();
}
public void stop() {
commandQueue.values().forEach(DefaultMessageListenerContainer::destroy);
commandQueue.clear();
}
While testing, I notice that after we destroy all the listener by calling stop()
, the queue and the address in the Artemis console are deleted too. It isn't the case for durable subscription.
@Bean(name = "eventJmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory eventJmsListenerContainerFactory(
CachingConnectionFactory cachingConnectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
cachingConnectionFactory.setClientId(UUID.randomUUID().toString());
var factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, cachingConnectionFactory);
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
return factory;
}
// usage is the same as the first block code, except we store multicast subscriptions in another map
private Map<String, DefaultMessageListenerContainer> eventTopic;
After running the unit tests and destroying all the listeners of two maps, only the test-event-topic
address and its queues were kept, the test-command-queue
was deleted. Why both the queues behave differently?
Also, what is the correct behavior? We afraid the auto deletion will remove messages that aren't sent in the queue yet. On the other hand, new queue under test-event-topic
keep being created if we run the test again and again. I think it is because of the line cachingConnectionFactory.setClientId(UUID.randomUUID().toString());
. But for durable subscription, not setting clientId result in error.
The connection factory used in the app is an CachingConnectionFactory
created by spring-artemis