3
votes

I like to read pending (not acknowledged) messages in a ActiveMQ queue using Spring boot. How to do that?

So far I can read a message the moment it is send to the queue:

@JmsListener(destination = "LOCAL.TEST", 
  containerFactory = "myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
    String messageData = null;
    // jsonMessage.acknowledge(); // dont consume message (for testing)
    LOGGER.info("=== Received message {}", jsonMessage);
}

using a standard configuration for the mq-connection:

@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL(BROKER_URL + ":" + BROKER_PORT);
    return activeMQConnectionFactory;
}

and a standard ListenerContainerFactory:

@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
  DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
  factory.setConnectionFactory(getActiveMQConnectionFactory());
  factory.setConcurrency("1-1");
  return factory;
}

But this just loggs a message if I manually send one using

@Autowired
private JmsTemplate jmsTemplate;

public void send(String destination, String message) {
    LOGGER.info("sending message='{}' to destination='{}'", message, destination);
    jmsTemplate.convertAndSend(destination, message);
}

with the standard template

@Bean
public JmsTemplate jmsTemplate() {
  JmsTemplate template = new JmsTemplate();
  template.setConnectionFactory(getActiveMQConnectionFactory());
  return template;
}

I cannot read messages sent earlier that are still in the Queue (since I didn't .acknowledge() them)...

3

3 Answers

4
votes

JMS supports "browsing" messages which appears to be the functionality you want. You should therefore change your Spring application to use a QueueBrowser instead of actually consuming the messages.

1
votes

Messages won't be resent if not acknowledged. They are not returned to the queue until the session is closed or the connection lost, for example by stopping (and restarting) the listener container created by the factory.

You can access the container using the JmsListenerEndpointRegistry bean (or stop/start the entire registry which will stop/start all of its containers).

1
votes

To read all pending messages, you can do like this

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
    Connection connection = connectionFactory.createConnection("admin", "admin");
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("listenerQueue");
    MessageConsumer consumer = session.createConsumer(destination);

    QueueBrowser browser = session.createBrowser((Queue) destination);
    Enumeration elems = browser.getEnumeration();
    while (elems.hasMoreElements()) {
        Message message = (Message) consumer.receive();

        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("Incoming Message: '" + textMessage.getText() + "'");
            message.acknowledge();
        }
    }
    connection.close();

Step by step implementation of Spring boot ActiveMQ. Lets write some code to make it more clear. This will help to read all pending messages in current session only.

  1. Add these dependencies in pom.xml file.
<!-- Dependencies to setup JMS and active mq environment -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-broker</artifactId>
        </dependency>
  1. Add @EnableJms into your main controller where your main() method exists.
  2. Create connection factory by adding these 2 methods in application controller only.
@Bean
    public JmsListenerContainerFactory<?> myFactory(
        ConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
      DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
      logger.info("configuring jms connection factory....");
      // anonymous class
      factory.setErrorHandler(
              new ErrorHandler() {
                  @Override
                  public void handleError(Throwable t) {
                      logger.error("An error has occurred in the transaction", t);
                  }
              });
      // lambda function
      factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
      configurer.configure(factory, connectionFactory);

      return factory;
    }

    // Serialize message content to json using TextMessage
    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
      MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
      converter.setTargetType(MessageType.TEXT);
      converter.setTypeIdPropertyName("_type");
      return converter;
    } 
  1. Mention credentials in in application.yml file as
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1

  1. Autowire jmsTemplate in any spring bean class.
@Autowired
private JmsTemplate jmsTemplate;
  1. Now it is time to send message to a queue.
jmsTemplate.convertAndSend("anyQueueName", "value1");
jmsTemplate.convertAndSend("anyQueueName", "value2");
...
  1. Add a jmslistener. This method will be called automatically by JMS when any message will be pushed to queue.
@JmsListener(destination ="anyQueueName", containerFactory = "myFactory")
    public void receiveMessage(String user) {
        System.out.println("Received <" + user + ">");
    }
  1. Manually you can read the messages available in queue:-
import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;

public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
            @Override
            public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
                Enumeration<TextMessage> messages = browser.getEnumeration();
                while (messages.hasMoreElements()) {
                    System.out.println("message found : -"+ messages.nextElement().getText());
                }
            }
        });
}

Output :-
message found :- value1

message found :- value2

-Happy Coding