0
votes

For POC purposes, I have a built a Spring Boot application that uses ActiveMQ for messaging via JMSTemplate.

For monitoring, I want to listen for messages that are put on and removed from queues using "Advisory Topics".

I have updated the ActiveMQ configuration to enable the relevant advisories:

<!-- activemq.xml -->
 <broker xmlns="http://activemq.apache.org/schema/core" useJmx="true" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" advisoryForConsumed="true" advisoryForDelivery="true">
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
                <policyEntry queue=">" advisoryForConsumed="true" advisoryForDelivery="true">
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

</broker>

In the application I have configured JMS connection factory and JMS listener container factory to enable advisories and pubsub domain, and set up listeners for the advisory topics:

@Configuration
public class JmsConfig {
    @Autowired
    MessageListener messageListener;

    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setWatchTopicAdvisories(true);
        connectionFactory.setBrokerURL("vm://localhost?broker.persistent=false");
        return connectionFactory;
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) throws JMSException {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        configurer.configure(factory, connectionFactory);

        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue bulkQueue = session.createQueue("bulk");

        Topic deliveredAdvisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(bulkQueue);
        MessageConsumer deliveredAdvisoryTopicConsumer = session.createConsumer(deliveredAdvisoryTopic);
        deliveredAdvisoryTopicConsumer.setMessageListener(messageListener);

        Topic consumedAdvisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic(bulkQueue);
        MessageConsumer consumedAdvisoryTopicConsumer = session.createConsumer(consumedAdvisoryTopic);
        consumedAdvisoryTopicConsumer.setMessageListener(messageListener);

        return factory;
    }

The listener that will read advisory topics is just for logging:

@Component
public class AdvisoryMessageListener implements MessageListener {
    @Override public void onMessage(Message message) {
        System.out.println("Received advisory message");
        System.out.println(message);
    }
}

The actual listener that will read from queue is similar to advisory message listener:

@Component
public class Receiver {

    @JmsListener(destination = "bulk", containerFactory = "jmsListenerContainerFactory")
    public void receiveMessage(Email email) {
        System.out.println("Received <" + email + ">");
    }

}

The Rest API will trigger the application to put messages on the queue:


@RestController("/emails")
public class EmailController {

    @Autowired
    private JmsTemplate jmsTemplate;

    @PostMapping("/")
    public void persistEmail(@RequestBody Email email) {
        jmsTemplate.setExplicitQosEnabled(true);
        jmsTemplate.setTimeToLive(0L);
        jmsTemplate.convertAndSend("bulk", email);
    }
}

Whenever the API is called and the email is put on the queue, Receiver.receiveMessage reads it and logs it but there is no action in the AdvisoryMessageListener.

The only thing that shows up in console is the following: Received <Email{[email protected], body=Hello}> Printed by Receiver.receiveMessage

What am I doing wrong?

1
I don't have an answer for you (I am not familiar with advisory queues) but you shouldn't be making connections and creating consumers in a @Bean definition; it's too early in the context lifecycle. Also, why don't you use Spring to create the listener containers for the advisory destinations rather than using the JMS API directly?Gary Russell
@GaryRussell good suggestion, current setup is definitely not production-ready. I'll update the code accordingly. Thanks!FarthVader

1 Answers

1
votes

This works fine for me...

@SpringBootApplication
public class So59196698Application {

    public static void main(String[] args) {
        SpringApplication.run(So59196698Application.class, args);
    }

    @JmsListener(destination = "so59196698")
    public void listen(Message in) {
        System.out.println("Received:" + in);
    }

    @JmsListener(destination = "#{advisoryTopicNames.deliveredTopic}", containerFactory = "topicFactory")
    public void delivered(Message in) {
        System.out.println("Delivered:" + in);
    }

    @JmsListener(destination = "#{advisoryTopicNames.consumedTopic}", containerFactory = "topicFactory")
    public void consumed(Message in) {
        System.out.println("Consumed:" + in);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            Thread.sleep(5000);
            template.convertAndSend("so59196698", "test");
        };
    }

    @Bean
    public JmsListenerContainerFactory<?> topicFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }

}

@Component
class AdvisoryTopicNames {

    private static final Destination QUEUE = new ActiveMQQueue("so59196698");

    public String getDeliveredTopic() throws JMSException {
        return AdvisorySupport.getMessageDeliveredAdvisoryTopic(QUEUE).getTopicName();
    }

    public String getConsumedTopic() throws JMSException {
        return AdvisorySupport.getMessageConsumedAdvisoryTopic(QUEUE).getTopicName();
    }

}

and

Received:ActiveMQTextMessage {commandId = 11, ...
Delivered:ActiveMQMessage {commandId = 0, ...
Consumed:ActiveMQMessage {commandId = 0, ...