I am trying to implement the following scenario and using JMS API. Using DefaultMessageListenerContainer to receive messages
Scenario - I have a publisher that publishes messages to a RabbitMQ exchange and a listener that listens to a RMQ queue to receive messages. If there was an exception while processing the received message on the listener, that message or group of messages should roll back and put back in the same queue.
I am using attribute "session transacted = true" to achieve the above scenario through a local transaction. But, my messages were not rolling back to the original queue when there was an exception. Can someone help if I am missing anything here.
Spring beans configuration is as follows.
<bean id="rmqConnectionFactory" class="com.rabbitmq.jms.admin.RMQConnectionFactory">
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="host" value="localhost"/>
<property name="port" value="5672"/>
<property name="virtualHost" value="/"/>
</bean>
<bean id="rmqDestination" class="com.rabbitmq.jms.admin.RMQDestination">
<property name="destinationName" value="testDestination"/>
<property name="amqpExchangeName" value="testExchange"/>
<property name="amqpRoutingKey" value="testRouting"/>
<property name="amqpQueueName" value="testDestination"/>
</bean>
<bean id="messageListener" class="org.jms.SimpleJMSMessageListener" />
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="rmqConnectionFactory"/>
<property name="destination" ref="rmqDestination"/>
<property name="messageListener" ref="messageListener" />
<property name="sessionTransacted" value="true"/>
</bean>
Message Listener is as follows:
public class SimpleJMSMessageListener implements MessageListener {
@Transactional
@Override
public void onMessage(Message message) {
int num[] = {1, 2, 3, 4};
if (message instanceof TextMessage) {
try {
String receivedMessage = ((TextMessage) message).getText();
logger.info("Received message via JMS message Listener-Consumer: {}", receivedMessage);
if(receivedMessage.endsWith("35")) {
int i = num[5];
logger.info("Error processing the message");
}
}
catch (JMSException ex) {
logger.error("error processing incoming jms message-Consumer", ex);
}
}
else {
logger.warn("Received non text jms message-Consumer {}", message);
}
}
}