1
votes

I am trying to implement the following scenario and using JMS API. Using DefaultMessageListenerContainer to receive messages

Scenario - I have a publisher that publishes messages to a RabbitMQ exchange and a listener that listens to a RMQ queue to receive messages. If there was an exception while processing the received message on the listener, that message or group of messages should roll back and put back in the same queue.

I am using attribute "session transacted = true" to achieve the above scenario through a local transaction. But, my messages were not rolling back to the original queue when there was an exception. Can someone help if I am missing anything here.

Spring beans configuration is as follows.

<bean id="rmqConnectionFactory" class="com.rabbitmq.jms.admin.RMQConnectionFactory">
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="host" value="localhost"/>
    <property name="port" value="5672"/>
    <property name="virtualHost" value="/"/>
</bean>

<bean id="rmqDestination" class="com.rabbitmq.jms.admin.RMQDestination">
    <property name="destinationName" value="testDestination"/>
    <property name="amqpExchangeName" value="testExchange"/>
    <property name="amqpRoutingKey" value="testRouting"/>
    <property name="amqpQueueName" value="testDestination"/>
</bean>


<bean id="messageListener" class="org.jms.SimpleJMSMessageListener" />
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="rmqConnectionFactory"/>
    <property name="destination" ref="rmqDestination"/>
    <property name="messageListener" ref="messageListener" />
    <property name="sessionTransacted" value="true"/>
</bean>

Message Listener is as follows:

public class SimpleJMSMessageListener implements MessageListener {
    @Transactional
    @Override
    public void onMessage(Message message) {

        int num[] = {1, 2, 3, 4};

        if (message instanceof TextMessage) {
            try {
                String receivedMessage = ((TextMessage) message).getText();
                logger.info("Received message via JMS message Listener-Consumer: {}", receivedMessage);
                if(receivedMessage.endsWith("35")) {
                    int i = num[5];
                    logger.info("Error processing the message");
                               }
            }
            catch (JMSException ex) {
                logger.error("error processing incoming jms message-Consumer", ex);
            }
        }
        else {
            logger.warn("Received non text jms message-Consumer {}", message);
        }
    }

}
1

1 Answers

0
votes

We've been looking at something similar, and I suspect the issue is related to the way the Spring DefaultMessageListenerContainer handles transactions. In particular, there's this quote from the Spring Forum (here http://forum.spring.io/forum/other-spring-related/remoting/24208-what-s-the-best-practice-for-using-jms-in-spring) :

Regarding transaction management: If you specify a "transactionManager" on DefaultMessageListenerContainer, it will wrap its receive loop in a transaction. This is typically used with JtaTransactionManager, where no transactional resources are actually bound: It's only really marking the thread (and the listener's JMS Session) as "XA-active" and waiting for something to happen. This should be pretty efficient with any decent JTA provider.

The implication is that, if you don't specify a transactionManager on the DMLC, then you won't actually have a transaction wrapping your onMessage - even if it's annotated as @Transactional. Also, while it's "typical" to use a JtaTransactionManager, that's likely because JMS Transactions are often distributed, and require XA. I believe you can also use Spring's JmsTranactionManager if you don't need XA.

So in your case, take a look at defining a transaction manager bean, and referencing this as the transactionManager in your DMLC config.