For POC purposes, I have a built a Spring Boot application that uses ActiveMQ for messaging via JMSTemplate.
For monitoring, I want to listen for messages that are put on and removed from queues using "Advisory Topics".
I have updated the ActiveMQ configuration to enable the relevant advisories:
<!-- activemq.xml -->
<broker xmlns="http://activemq.apache.org/schema/core" useJmx="true" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" advisoryForConsumed="true" advisoryForDelivery="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" advisoryForConsumed="true" advisoryForDelivery="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
In the application I have configured JMS connection factory and JMS listener container factory to enable advisories and pubsub domain, and set up listeners for the advisory topics:
@Configuration
public class JmsConfig {
@Autowired
MessageListener messageListener;
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setWatchTopicAdvisories(true);
connectionFactory.setBrokerURL("vm://localhost?broker.persistent=false");
return connectionFactory;
}
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) throws JMSException {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
configurer.configure(factory, connectionFactory);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue bulkQueue = session.createQueue("bulk");
Topic deliveredAdvisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(bulkQueue);
MessageConsumer deliveredAdvisoryTopicConsumer = session.createConsumer(deliveredAdvisoryTopic);
deliveredAdvisoryTopicConsumer.setMessageListener(messageListener);
Topic consumedAdvisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic(bulkQueue);
MessageConsumer consumedAdvisoryTopicConsumer = session.createConsumer(consumedAdvisoryTopic);
consumedAdvisoryTopicConsumer.setMessageListener(messageListener);
return factory;
}
The listener that will read advisory topics is just for logging:
@Component
public class AdvisoryMessageListener implements MessageListener {
@Override public void onMessage(Message message) {
System.out.println("Received advisory message");
System.out.println(message);
}
}
The actual listener that will read from queue is similar to advisory message listener:
@Component
public class Receiver {
@JmsListener(destination = "bulk", containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(Email email) {
System.out.println("Received <" + email + ">");
}
}
The Rest API will trigger the application to put messages on the queue:
@RestController("/emails")
public class EmailController {
@Autowired
private JmsTemplate jmsTemplate;
@PostMapping("/")
public void persistEmail(@RequestBody Email email) {
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setTimeToLive(0L);
jmsTemplate.convertAndSend("bulk", email);
}
}
Whenever the API is called and the email is put on the queue, Receiver.receiveMessage reads it and logs it but there is no action in the AdvisoryMessageListener.
The only thing that shows up in console is the following:
Received <Email{[email protected], body=Hello}>
Printed by Receiver.receiveMessage
What am I doing wrong?
@Bean
definition; it's too early in the context lifecycle. Also, why don't you use Spring to create the listener containers for the advisory destinations rather than using the JMS API directly? – Gary Russell