I am using Dead Letter Channel EIP exactly as described in Camel docs for Dead Letter Channel. Here is my camel.xml (headers removed)
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<!-- this is the Dead Letter Channel error handler, where we send failed message to a log endpoint -->
<route errorHandlerRef="myDeadLetterErrorHandler">
<from uri="jms:q1" />
<doTry>
<validate><simple>${body[subsc]} regex '[a-z0-9]{8}' </simple> </validate>
<choice>
<when>
<simple>${body[key1]} regex '^(v1)$'</simple>
<choice>
<when>
<simple>${body[key2]} == 'v2'</simple>
<to uri="jms:getInfo" />
</when>
</choice>
</when>
<otherwise>
<to uri="jms:invalid" />
<stop />
</otherwise>
</choice>
<doCatch>
<exception>org.apache.camel.ValidationException</exception>
<to uri="jms:invalid"/>
</doCatch>
</doTry>
</route>
<route>
<from uri="jms:queue:deadlc" />
<log message="Got ${body}" loggingLevel="ERROR" logName="cool" />
</route>
</camelContext>
<!--
Lets configure some Camel endpoints
http://camel.apache.org/components.html
-->
<!-- configure the camel activemq component to use the current broker -->
<bean id="confact" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://amq-broker?create=false"/>
<property name="userName" value="${activemq.username}"/>
<property name="password" value="${activemq.password}"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
<property name="connectionFactory" ref="confact" />
</bean>
<bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start"
destroy-method="stop">
<property name="maxConnections" value="8" />
<property name="connectionFactory" ref="confact" />
<property name="expiryTimeout" value="-1" />
<property name="idleTimeout" value="-1" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory"/>
<property name="concurrentConsumers" value="8"/>
</bean>
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig"/>
</bean>
<bean id="myDeadLetterErrorHandler" class="org.apache.camel.builder.DeadLetterChannelBuilder">
<property name="deadLetterUri" value="jms:queue:deadlc"/>
<property name="redeliveryPolicy" ref="myRedeliveryPolicyConfig"/>
</bean>
<bean id="myRedeliveryPolicyConfig" class="org.apache.camel.processor.RedeliveryPolicy">
<property name="maximumRedeliveries" value="3"/>
<property name="redeliveryDelay" value="5000"/>
</bean>
I have just one route that has Content Based Router which essentially says if message body has getInfo then route from jms:foo to jms:getInfo, if message body has performAction then route from jms.foo to jms:performAction
I expect that when jms:getInfo consumer is not running then delivery would fail, redelivery attempts would be as specified in spring XML. But nothing happens and nothing goes to dead letter queue, instead activemq throws exception that is shown below.
Can some one explain why it is not working? My understanding is that when the consumer (which is the destination that will receive the message from content based router) is not running, acivemq knows that which should not even deliver it and should immediately throw an exception which is then handled by camel's dead letter channel as configured.
I am using camel 2.10.3 and activemq 5.8.0.
I suspect that activemq is handling failed message delivery ignoring camel xml configuration for dead letter queue because these failed messages show up in Activemq.DLQ in the activemq web console. If the consumer is running then messages successfully delivered according to the routing specified in camel.xml.
0:58:46,317 | DEBUG | Sending JMS message to: queue://getInfo with message: ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = a79a0f9e-c88f-4501-a56d-2d5667aa98e0, replyTo = temp-queue://ID:myhost-57639-1382233787365-3:3:6, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID:myhost-57439-1382232345453-1:1:1:1:6, CamelJmsDeliveryMode=1}, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {cluster=x78} } | org.apache.camel.component.jms.JmsConfiguration | Camel (camel) thread #2 - JmsConsumer[bar] 20:58:48,044 | DEBUG | Evicting inactive entry ID: 8b0d77a3-afef-4612-a49f-22c1b81eb80e (times out after 20000 millis) | org.apache.camel.component.jms.reply.CorrelationTimeoutMap | Camel (camel) thread #9 - JmsReplyManagerTimeoutChecker[getInfo] 20:58:48,044 | WARN | Timeout occurred after 20000 millis waiting for reply message with correlationID [8b0d77a3-afef-4612-a49f-22c1b81eb80e]. Setting ExchangeTimedOutException on (MessageId: ID:myhost-57439-1382232345453-1:1:1:1:5 on ExchangeId: ID-myhost-57640-1382233788284-0-2) and continue routing. | org.apache.camel.component.jms.reply.TemporaryQueueReplyManager | Camel (camel) thread #9 - JmsReplyManagerTimeoutChecker[getInfo] 20:58:48,045 | WARN | Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: 8b0d77a3-afef-4612-a49f-22c1b81eb80e not received. Exchange[JmsMessage[JmsMessageID: ID:myhost-57439-1382232345453-1:1:1:1:5]]] | org.apache.camel.component.jms.EndpointMessageListener | Camel (camel) thread #4 - JmsConsumer[bar] org.apache.camel.RuntimeCamelException: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: 8b0d77a3-afef-4612-a49f-22c1b81eb80e not received. Exchange[JmsMessage[JmsMessageID: ID:myhost-57439-1382232345453-1:1:1:1:5]] at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1271) at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:187) at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:108) at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562) at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500) at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Caused by: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: 8b0d77a3-afef-4612-a49f-22c1b81eb80e not received. Exchange[JmsMessage[JmsMessageID: ID:myhost-57439-1382232345453-1:1:1:1:5]] at org.apache.camel.component.jms.reply.ReplyManagerSupport.processReply(ReplyManagerSupport.java:133) at org.apache.camel.component.jms.reply.TemporaryQueueReplyHandler.onTimeout(TemporaryQueueReplyHandler.java:61) at org.apache.camel.component.jms.reply.CorrelationTimeoutMap.onEviction(CorrelationTimeoutMap.java:53) at org.apache.camel.component.jms.reply.CorrelationTimeoutMap.onEviction(CorrelationTimeoutMap.java:30) at org.apache.camel.support.DefaultTimeoutMap.purge(DefaultTimeoutMap.java:209) at org.apache.camel.support.DefaultTimeoutMap.run(DefaultTimeoutMap.java:159) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecu
jms:getInfo
endpoint in your camel.xml – Pith