1
votes

We are develop a micro-service system that use ActiveMQ Artemis as the communication method between service. Since the requirement ask to be able to stop the listeners at runtime, we can not use @JmsListener provide by spring-artemis. After digging the internet and finding out that spring use MessageListenerContainer behind the scence, we come up with the idea of maintain a list of MessageListenerContainer our self.

    @Bean(name = "commandJmsListenerContainerFactory")
    public DefaultJmsListenerContainerFactory commandJmsListenerContainerFactory(
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
        var factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }

    // Use
    private Map<String, DefaultMessageListenerContainer> commandQueue;

    public void subscribeToCommandQueue(String queueName, CommandListener<?> command) {
        commandQueue.computeIfAbsent(queueName, key -> {
            var endPoint = new SimpleJmsListenerEndpoint();
            endPoint.setDestination(queueName);
            endPoint.setMessageListener(message -> {
                try {
                    var body = message.getBody(String.class);
                    command.execute(commandMessageConverter.deserialize(body));
                } catch (JMSException e) {
                    throw new RuntimeException("Error while process message for queue: " + queueName, e);
                }
            });
            var container = commandJmsListenerContainerFactory.createListenerContainer(endPoint);
            // https://stackguides.com/questions/44555106/defaultmessagelistenercontainer-not-reading-messages-from-ibm-mq
            // for Every object of Spring classes that implement InitializingBean created manually, we need to call afterPropertiesSet to make the object "work"
            container.afterPropertiesSet();
            container.start();
            return container;
        });
    }

    public void start() {
        commandQueue = new ConcurrentHashMap<>();
    }

    public void stop() {
        commandQueue.values().forEach(DefaultMessageListenerContainer::destroy);
        commandQueue.clear();
    }

While testing, I notice that after we destroy all the listener by calling stop() , the queue and the address in the Artemis console are deleted too. It isn't the case for durable subscription.

    @Bean(name = "eventJmsListenerContainerFactory")
    public DefaultJmsListenerContainerFactory eventJmsListenerContainerFactory(
        CachingConnectionFactory cachingConnectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
        cachingConnectionFactory.setClientId(UUID.randomUUID().toString());
        var factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, cachingConnectionFactory);
        factory.setPubSubDomain(true);
        factory.setSubscriptionDurable(true);
        return factory;
    }

    // usage is the same as the first block code, except we store multicast subscriptions in another map
    private Map<String, DefaultMessageListenerContainer> eventTopic;

After running the unit tests and destroying all the listeners of two maps, only the test-event-topic address and its queues were kept, the test-command-queue was deleted. Why both the queues behave differently?

enter image description here

Also, what is the correct behavior? We afraid the auto deletion will remove messages that aren't sent in the queue yet. On the other hand, new queue under test-event-topic keep being created if we run the test again and again. I think it is because of the line cachingConnectionFactory.setClientId(UUID.randomUUID().toString()); . But for durable subscription, not setting clientId result in error.

The connection factory used in the app is an CachingConnectionFactory created by spring-artemis

1

1 Answers

2
votes

By default the broker will auto-create addresses and queues as required when a message is sent or a consumer is created by the core JMS client. These resources will also be auto-deleted by default when they're no longer needed (i.e. when a queue has no consumers and messages or when an address no longer has any queues bound to it). This is controlled by these settings in broker.xml which are discussed in the documentation:

  • auto-create-queues
  • auto-delete-queues
  • auto-create-addresses
  • auto-delete-addresses

To be clear, auto-deletion should not cause any message loss by default as queues should only be deleted when they have 0 consumers and 0 messages. However, you can always set auto-deletion to false to be 100% safe.

Queues representing durable JMS topic subscriptions won't be deleted as they are meant to stay and gather messages while the consumer is offline. In other words, a durable topic subscription will remain if the client using the subscription is shutdown without first explicitly removing the subscription. That's the whole point of durable subscriptions - they are durable. Any client can use a durable topic subscription if it connects with the same client ID and uses the same subscription name. However, unless the durable subscription is a "shared" durable subscription then only one client at a time can be connected to it. Shared durable topic subscriptions were added in JMS 2.0.