I have a Camel Route that reads from an ActiveMQ JMS Queue, does some processing, and passes the result on to a remote destination. If the communications part of this process fails, I want to retry indefinitely (until the destination is "up"). I can handle this by making the route transacted
and setting up the redeliveryPolicy
. The redelivery is set in both Camel (via camel.processor.RedeliveryPolicy
to reattempt the failed part of the route) and the ActiveMQConnectionFactory (via org.apache.activemq.RedeliveryPolicy
to reattempt the entire route).
I also have a requirement that we should be able to delete an entry from the JMS queue (which I do via an application that talks to ActiveMQ via JMX), and processing should move to the next message.
The issue is that I can either allow message deletions (by setting the consumer to be cacheLevelName=CACHE_NONE
), or have the connection handle retries (by setting the consumer to be cacheLevelName=CACHE_CONSUMER
), but not both.
Here is my current setup:
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.brokerUrl}" />
<property name="redeliveryPolicy" ref="amqRedeliveryPolicy" />
<property name="prefetchPolicy">
<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="all" value="0"/>
</bean>
</property>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="transacted" value="true"/>
<property name="concurrentConsumers" value="1"/>
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent" >
<property name="configuration" ref="jmsConfig" />
</bean>
<!-- Redelivery Policy for ActiveMQ (the Broker), so it will perform retries of the entire route -->
<bean id="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="initialRedeliveryDelay" value="5000" />
<property name="redeliveryDelay" value="5000" />
<property name="maximumRedeliveries" value="-1" />
<property name="queue" value=">" />
</bean>
<!-- Redelivery Policy for Camel Internal, so it will retry from the part of the exchange that's failed in case of comms issues -->
<bean id="redeliveryProfile" class="org.apache.camel.processor.RedeliveryPolicy">
<property name="maximumRedeliveries" value="2"/>
<property name="redeliveryDelay" value="500"/>
</bean>
<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<constructor-arg>
<bean class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
</constructor-arg>
</bean>
<camel:camelContext id="testContext">
<camel:routeBuilder ref="transactedRoute" />
</camel:camelContext>
With a very simple route:
@Component
public class TransactedRoute extends SpringRouteBuilder {
@Override
public void configure() {
onException(Exception.class) // we would handle comms exceptions here
.redeliveryPolicyRef("redeliveryProfile")
.rollback();
from("jms:queue:myqueue?cacheLevelName=CACHE_CONSUMER")
.transacted("PROPAGATION_REQUIRED")
.bean(Bean1.class)
.to(DESTINATION); // this could throw a comms exception
}
}
With this setup, the route reads from jms:queue:myqueue
, and in the case of a Comms exception, Camel internally retries the route from to
part twice (redeliveryProfile
). Then after a 5-second delay the message is sent through the entire route again (amqRedeliveryPolicy
). This continues until I stop the test.
However, if I delete the message from ActiveMQ, it continues to be processed by the route, despite it no longer being in the queue.
If I change the consumer to be:
from("jms:queue:myqueue?cacheLevelName=CACHE_NONE")
I can now delete the message from ActiveMQ and the route stops processing it... but amqRedeliveryPolicy
is ignored, the message is retried immediately (no 5-second delay) and after 6 attempts (the default for AMQ) it is put into the Dead Letter Queue.
So, is there a way to achieve both by modifying my configuration?
Or have I completely missed the point somewhere along the lines?
acknowledgementModeName
toCLIENT_ACKNOWLEDGE
on theto(...)
of the route meant that I got continual retries, with message deletion recognised, but the message was redelivered immediately (no 5s delay) after the Camel-internal redelivery. – icabod