2
votes

I have a Camel Route that reads from an ActiveMQ JMS Queue, does some processing, and passes the result on to a remote destination. If the communications part of this process fails, I want to retry indefinitely (until the destination is "up"). I can handle this by making the route transacted and setting up the redeliveryPolicy. The redelivery is set in both Camel (via camel.processor.RedeliveryPolicy to reattempt the failed part of the route) and the ActiveMQConnectionFactory (via org.apache.activemq.RedeliveryPolicy to reattempt the entire route).

I also have a requirement that we should be able to delete an entry from the JMS queue (which I do via an application that talks to ActiveMQ via JMX), and processing should move to the next message.

The issue is that I can either allow message deletions (by setting the consumer to be cacheLevelName=CACHE_NONE), or have the connection handle retries (by setting the consumer to be cacheLevelName=CACHE_CONSUMER), but not both.

Here is my current setup:

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
  <property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="${jms.brokerUrl}" />
  <property name="redeliveryPolicy" ref="amqRedeliveryPolicy" />
  <property name="prefetchPolicy">
    <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
      <property name="all" value="0"/>
    </bean>
  </property>
</bean> 

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
  <property name="connectionFactory" ref="jmsConnectionFactory"/>
  <property name="transactionManager" ref="jmsTransactionManager"/>
  <property name="transacted" value="true"/>
  <property name="concurrentConsumers" value="1"/>
</bean>

<bean id="jms" class="org.apache.camel.component.jms.JmsComponent" >
  <property name="configuration" ref="jmsConfig" />
</bean>

<!-- Redelivery Policy for ActiveMQ (the Broker), so it will perform retries of the entire route -->
<bean id="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
  <property name="initialRedeliveryDelay" value="5000" />
  <property name="redeliveryDelay" value="5000" />
  <property name="maximumRedeliveries" value="-1" />
  <property name="queue" value=">" />
</bean>

<!-- Redelivery Policy for Camel Internal, so it will retry from the part of the exchange that's failed in case of comms issues -->
<bean id="redeliveryProfile" class="org.apache.camel.processor.RedeliveryPolicy">
  <property name="maximumRedeliveries" value="2"/>
  <property name="redeliveryDelay" value="500"/>
</bean>

<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <constructor-arg>
    <bean class="org.springframework.transaction.support.TransactionTemplate">
      <property name="transactionManager" ref="jmsTransactionManager"/>
    </bean>
  </constructor-arg>
</bean>

<camel:camelContext id="testContext">
  <camel:routeBuilder ref="transactedRoute" />
</camel:camelContext>

With a very simple route:

@Component
public class TransactedRoute extends SpringRouteBuilder {
  @Override
  public void configure() {
    onException(Exception.class) // we would handle comms exceptions here
      .redeliveryPolicyRef("redeliveryProfile")
      .rollback();

    from("jms:queue:myqueue?cacheLevelName=CACHE_CONSUMER")
      .transacted("PROPAGATION_REQUIRED")
      .bean(Bean1.class)
      .to(DESTINATION); // this could throw a comms exception
  }
}

With this setup, the route reads from jms:queue:myqueue, and in the case of a Comms exception, Camel internally retries the route from to part twice (redeliveryProfile). Then after a 5-second delay the message is sent through the entire route again (amqRedeliveryPolicy). This continues until I stop the test.

However, if I delete the message from ActiveMQ, it continues to be processed by the route, despite it no longer being in the queue.

If I change the consumer to be:

from("jms:queue:myqueue?cacheLevelName=CACHE_NONE")

I can now delete the message from ActiveMQ and the route stops processing it... but amqRedeliveryPolicy is ignored, the message is retried immediately (no 5-second delay) and after 6 attempts (the default for AMQ) it is put into the Dead Letter Queue.

So, is there a way to achieve both by modifying my configuration?

Or have I completely missed the point somewhere along the lines?

1
I did some more playing around and found that setting the acknowledgementModeName to CLIENT_ACKNOWLEDGE on the to(...) of the route meant that I got continual retries, with message deletion recognised, but the message was redelivered immediately (no 5s delay) after the Camel-internal redelivery.icabod

1 Answers

2
votes

Sounds a bit like a bad design if you need human intervention to delete a specific message on a queue via JMX.

Maybe you can let AMQ move the message to a DLQ after X number of failed attempts - redelivery forever is also a bad design. But you can increase the defaults to a higher value if 6 attempts is to little.

Then from the DLQ queue you can inspect the messages and maybe try to understand why the message failed, and you can always delete or purge the DLQ queue via JMX or other tools safely.

AMQ allows to have a DLQ per queue where you can configure it to use a DLQ prefix etc, instead of one common DLQ queue.