Note: I am using Spring Integration 3.0.5.RELEASE due to dependency requirements for the project that I am working on.
I have configured a connection factory with a redelivery policy (we are using Active MQ 5.7)
<amq:connectionFactory id="cms.jmsConnectionFactory"
brokerURL="${cms.jms.brokerURL}" p:redeliveryPolicy-ref="cms.jms.redeliveryPolicy"
p:prefetchPolicy-ref="cms.jms.prefetchPolicy" p:userName="${cms.jms.userName}" p:password="${cms.jms.password}"/>
<amq:redeliveryPolicy id="cms.jms.redeliveryPolicy"
p:backOffMultiplier="5"
p:initialRedeliveryDelay="1000"
p:maximumRedeliveries="6"
p:redeliveryDelay="1000"
p:useExponentialBackOff="false"
p:useCollisionAvoidance="false" />
I have a service activator which is activated via messages on a message-driven-channel-adapter
<int-jms:message-driven-channel-adapter channel="cms.jms.archiveNodeChannel"
connection-factory="cms.jms.cachedConnectionFactory" destination="cms.jms.archiveNodeDestination" transaction-manager="cms.jms.txManager"
acknowledge="transacted" concurrent-consumers="1" max-concurrent-consumers="1" cache-level="3"/>
<int:service-activator ref="cms.int.nodeArchiver" method="archiveNode" input-channel="cms.jms.archiveNodeChannel"/>
In some cases the service activator can throw a RuntimeException. The behaviour that I would like is for the message to be re-delivered 6 times as per the maximumRedeliveries property on the redelivery policy and then for an error handler to be invoked at the Spring Integration layer however what is actually happening when a RuntimeException is thrown is that the message is redelivered endlessly until I kill the application. I have tried adding an error-channel to the message-driven-channel-adapter but this is never activated.
This post is slightly old but suggests that in a JMS Message-driven scenario. The DefaultMessageListenerContainer is the "caller" and that a RuntimeException (in the AbstractMessageListenerContainer base class) is simply logged.
It may be that I have to extend the DefaultMessageListenerContainer to handle RuntimeExceptions differently but I am wondering if anyone can point me towards a more elegant solution or tell me that I am just plain wrong in my approach.
--- JMS Logging (I have indented to make it more readable). The logs show the details from the time of the first Error "No file type was found for file 9789814316385.txt" to the next time of the same error (This trace is repeated ad-infinitum until the system is shut down.) Indeed the redeliveryCounter is always to 0 I am not sure why this is the case
ERROR [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] No file type was found for file 9789814316385.txt
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Initiating transaction commit
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:6,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Creating new transaction with name
[org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Created JMS transaction on Session
[Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:6,started=true}]
from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1]
org.springframework.integration.jms.JmsSendingMessageHandler#4 received message: [Payload=]
[Headers={sequenceNumber=1, sequenceSize=1, jms_timestamp=1437721454735, , jms_redelivered=false, ,
jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:818, timestamp=1437721454738}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Executing callback on JMS Session: Cached JMS Session:
ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Sending created message: ActiveMQTextMessage
{commandId = 0, responseRequired = false, messageId = null, originalDestination = null,
originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0,
properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454738}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = }
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] postSend (sent=true) on channel 'cms.jms.archiveNodeChannel', message: [Payload=]
[Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454735, , jms_redelivered=false, ,
jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:818, timestamp=1437721454738}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Initiating transaction commit
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:7,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Creating new transaction with name
[org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Created JMS transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:7,started=true}]
from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Received message of type [class org.apache.activemq.command.ActiveMQTextMessage]
from consumer [Cached JMS MessageConsumer: ActiveMQMessageConsumer { value=ID:simac.home-56110-1437721286921-0:1:2:1, started=true }] of session
[Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:2,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] converted JMS Message [ActiveMQTextMessage
{commandId = 4953, responseRequired = false, messageId = ID:simac.home-56110-1437721286921-0:1:7:1:819, originalDestination = null,
originalTransactionId = null, producerId = ID:simac.home-56110-1437721286921-0:1:7:1, destination = queue://cms.jms.queue.archiveNode
transactionId = TX:ID:simac.home-56110-1437721286921-0:1:1644, expiration = 0, timestamp = 1437721454772,
arrival = 0, brokerInTime = 1437721454773, brokerOutTime = 1437721454774, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@3ee1137b,
dataStructure = null, redeliveryCounter = 0, size = 0, properties =
{sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454738}, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = }]
to integration Message payload []
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] preSend on channel 'cms.jms.archiveNodeChannel',
message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, ,
jms_redelivered=false, , , id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, ,
, jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] org.springframework.integration.jms.JmsSendingMessageHandler#4 received message:
[Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, , jms_redelivered=false, , ,
id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Executing callback on JMS Session: Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:7,started=true}
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Sending created message: ActiveMQTextMessage
{commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null,
destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0,
targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null,
redeliveryCounter = 0, size = 0, properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454775},
readOnlyProperties = false, readOnlyBody = false, droppable = false, text = }
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] postSend (sent=true) on channel 'cms.jms.archiveNodeChannel', message:
[Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, , jms_redelivered=false, , ,
id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Initiating transaction commit
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:7,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Creating new transaction with name
[org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Created JMS transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:simac.home-56110-1437721286921-0:1:7,started=true}] from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,
clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Received message of type [class org.apache.activemq.command.ActiveMQTextMessage]
from consumer [Cached JMS MessageConsumer: ActiveMQMessageConsumer { value=ID:simac.home-56110-1437721286921-0:1:2:1, started=true }]
of session [Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:2,started=true}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] converted JMS Message [ActiveMQTextMessage
{commandId = 4959, responseRequired = false, messageId = ID:simac.home-56110-1437721286921-0:1:7:1:820, originalDestination = null,
originalTransactionId = null, producerId = ID:simac.home-56110-1437721286921-0:1:7:1, destination = queue://cms.jms.queue.archiveNode,
transactionId = TX:ID:simac.home-56110-1437721286921-0:1:1646, expiration = 0, timestamp = 1437721454775, arrival = 0, brokerInTime = 1437721454776,
brokerOutTime = 1437721454777, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@214372df,
dataStructure = null, redeliveryCounter = 0, size = 0, properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454775},
readOnlyProperties = true, readOnlyBody = true, droppable = false, text = }] to integration Message payload []
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] preSend on channel 'cms.jms.archiveNodeChannel',
message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454775, , jms_redelivered=false, , ,
id=22069a85-30c5-5478-1b6c-10b6264575b1, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:820, timestamp=1437721454778}]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] ServiceActivator for
[org.springframework.integration.handler.MethodInvokingMessageProcessor@755cd5ee] received message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1,
, , jms_timestamp=1437721454775, , jms_redelivered=false, , , id=22069a85-30c5-5478-1b6c-10b6264575b1, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:820,
timestamp=1437721454778}]
ERROR [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] No file type was found for file 9789814316385.txt
Having attached a debugger I can see the reason for the exception that is thrown by the service activator not propagating:
The problem occurs in org.springframework.integration.dispatcher.UnicastingDispatcher
private boolean doDispatch(Message<?> message) {
boolean success = false;
Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
if (!handlerIterator.hasNext()) {
throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
}
List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
while (success == false && handlerIterator.hasNext()) {
MessageHandler handler = handlerIterator.next();
try {
handler.handleMessage(message);
success = true; // we have a winner.
}
catch (Exception e) {
RuntimeException runtimeException = (e instanceof RuntimeException)
? (RuntimeException) e
: new MessageDeliveryException(message,
"Dispatcher failed to deliver Message.", e);
if (e instanceof MessagingException &&
((MessagingException) e).getFailedMessage() == null) {
((MessagingException) e).setFailedMessage(message);
}
exceptions.add(runtimeException);
this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
}
}
return success;
}
The doDispatch shown above loops over each MessageHander returned by Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
breaking if one handles the message successfully.
The handlerIterator contains 2 MessageHandlers, the first is my Service Activator (which throws the RuntimeException) the second is org.springframework.integration.jms.JmsSendingMessageHandler (I am not sure why this is here)
When the Service Activator throws the exception it is passed to the handleExceptions method shown below:
/**
* Handles Exceptions that occur while dispatching. If this dispatcher has
* failover enabled, it will only throw an Exception when the handler list
* is exhausted. The 'isLast' flag will be <em>true</em> if the
* Exception occurred during the final iteration of the MessageHandlers.
* If failover is disabled for this dispatcher, it will re-throw any
* Exception immediately.
*/
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message, boolean isLast) {
if (isLast || !this.failover) {
if (allExceptions != null && allExceptions.size() == 1) {
throw allExceptions.get(0);
}
throw new AggregateMessageDeliveryException(message,
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
}
}
Because the Service Activator is not the last handler (isLast = false) the exception is not thrown. The second JmsSendingMessageHandler handles the message (although I don't know why) and the message just keeps being redelivered. Does anyone know why this JmsSendingMessageHandler is also attempting to handle the message designated for the Service Activator?
redelivery count
header. Otherwise your ActiveMQ jsut doesn't work properly! – Artem Bilanredelivery count
does always equal 0. There is also no stack trace. Do you have any idea what might be causing this? – Simon Hutchinsonchannel
. Just make a seek throughout your configuration and try to find who else uses yourcms.jms.archiveNodeChannel
. In this case it must be<int-jms:outbound-channel-adapter>
– Artem Bilan