1
votes

Note: I am using Spring Integration 3.0.5.RELEASE due to dependency requirements for the project that I am working on.

I have configured a connection factory with a redelivery policy (we are using Active MQ 5.7)

<amq:connectionFactory id="cms.jmsConnectionFactory"
    brokerURL="${cms.jms.brokerURL}" p:redeliveryPolicy-ref="cms.jms.redeliveryPolicy"
    p:prefetchPolicy-ref="cms.jms.prefetchPolicy" p:userName="${cms.jms.userName}" p:password="${cms.jms.password}"/>


<amq:redeliveryPolicy id="cms.jms.redeliveryPolicy"
    p:backOffMultiplier="5"
    p:initialRedeliveryDelay="1000"
    p:maximumRedeliveries="6"
    p:redeliveryDelay="1000"
    p:useExponentialBackOff="false"
    p:useCollisionAvoidance="false" />

I have a service activator which is activated via messages on a message-driven-channel-adapter

<int-jms:message-driven-channel-adapter channel="cms.jms.archiveNodeChannel"
    connection-factory="cms.jms.cachedConnectionFactory" destination="cms.jms.archiveNodeDestination" transaction-manager="cms.jms.txManager"
    acknowledge="transacted" concurrent-consumers="1" max-concurrent-consumers="1" cache-level="3"/>

<int:service-activator ref="cms.int.nodeArchiver" method="archiveNode" input-channel="cms.jms.archiveNodeChannel"/>

In some cases the service activator can throw a RuntimeException. The behaviour that I would like is for the message to be re-delivered 6 times as per the maximumRedeliveries property on the redelivery policy and then for an error handler to be invoked at the Spring Integration layer however what is actually happening when a RuntimeException is thrown is that the message is redelivered endlessly until I kill the application. I have tried adding an error-channel to the message-driven-channel-adapter but this is never activated.

This post is slightly old but suggests that in a JMS Message-driven scenario. The DefaultMessageListenerContainer is the "caller" and that a RuntimeException (in the AbstractMessageListenerContainer base class) is simply logged.

It may be that I have to extend the DefaultMessageListenerContainer to handle RuntimeExceptions differently but I am wondering if anyone can point me towards a more elegant solution or tell me that I am just plain wrong in my approach.

--- JMS Logging (I have indented to make it more readable). The logs show the details from the time of the first Error "No file type was found for file 9789814316385.txt" to the next time of the same error (This trace is repeated ad-infinitum until the system is shut down.) Indeed the redeliveryCounter is always to 0 I am not sure why this is the case

ERROR  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] No file type was found for file 9789814316385.txt
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Initiating transaction commit
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession 
   {id=ID:simac.home-56110-1437721286921-0:1:6,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Creating new transaction with name 
   [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Created JMS transaction on Session 
   [Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:6,started=true}] 
   from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] 
   org.springframework.integration.jms.JmsSendingMessageHandler#4 received message: [Payload=]
[Headers={sequenceNumber=1, sequenceSize=1, jms_timestamp=1437721454735, , jms_redelivered=false, , 
   jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:818, timestamp=1437721454738}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Executing callback on JMS Session: Cached JMS Session:
 ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Sending created message: ActiveMQTextMessage 
   {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, 
   originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, 
   correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, 
   compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, 
   properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454738}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = }
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] postSend (sent=true) on channel 'cms.jms.archiveNodeChannel', message: [Payload=]
   [Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454735, , jms_redelivered=false, , 
   jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:818, timestamp=1437721454738}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Initiating transaction commit
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession
 {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Creating new transaction with name 
   [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Created JMS transaction on Session [Cached JMS Session: ActiveMQSession
 {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}] 
from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Received message of type [class org.apache.activemq.command.ActiveMQTextMessage] 
   from consumer [Cached JMS MessageConsumer: ActiveMQMessageConsumer { value=ID:simac.home-56110-1437721286921-0:1:2:1, started=true }] of session 
   [Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:2,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] converted JMS Message [ActiveMQTextMessage 
   {commandId = 4953, responseRequired = false, messageId = ID:simac.home-56110-1437721286921-0:1:7:1:819, originalDestination = null, 
   originalTransactionId = null, producerId = ID:simac.home-56110-1437721286921-0:1:7:1, destination = queue://cms.jms.queue.archiveNode 
   transactionId = TX:ID:simac.home-56110-1437721286921-0:1:1644, expiration = 0, timestamp = 1437721454772, 
   arrival = 0, brokerInTime = 1437721454773, brokerOutTime = 1437721454774, correlationId = null, replyTo = null, 
   persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, 
   compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@3ee1137b, 
   dataStructure = null, redeliveryCounter = 0, size = 0, properties = 
   {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454738}, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = }]
   to integration Message payload []
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] preSend on channel 'cms.jms.archiveNodeChannel', 
   message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, , 
   jms_redelivered=false, , , id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, , 
   , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] org.springframework.integration.jms.JmsSendingMessageHandler#4 received message: 
   [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, , jms_redelivered=false, , , 
   id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Executing callback on JMS Session: Cached JMS Session: ActiveMQSession
   {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Sending created message: ActiveMQTextMessage 
   {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null,
   destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, 
   correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, 
   targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, 
   redeliveryCounter = 0, size = 0, properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454775}, 
   readOnlyProperties = false, readOnlyBody = false, droppable = false, text = }
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] postSend (sent=true) on channel 'cms.jms.archiveNodeChannel', message: 
   [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, , jms_redelivered=false, , , 
   id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Initiating transaction commit
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession 
   {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Creating new transaction with name 
   [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Created JMS transaction on Session [Cached JMS Session: ActiveMQSession 
   {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}] from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,
   clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Received message of type [class org.apache.activemq.command.ActiveMQTextMessage] 
   from consumer [Cached JMS MessageConsumer: ActiveMQMessageConsumer { value=ID:simac.home-56110-1437721286921-0:1:2:1, started=true }]
   of session [Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:2,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] converted JMS Message [ActiveMQTextMessage 
   {commandId = 4959, responseRequired = false, messageId = ID:simac.home-56110-1437721286921-0:1:7:1:820, originalDestination = null, 
   originalTransactionId = null, producerId = ID:simac.home-56110-1437721286921-0:1:7:1, destination = queue://cms.jms.queue.archiveNode, 
   transactionId = TX:ID:simac.home-56110-1437721286921-0:1:1646, expiration = 0, timestamp = 1437721454775, arrival = 0, brokerInTime = 1437721454776, 
   brokerOutTime = 1437721454777, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, 
   groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@214372df,
   dataStructure = null, redeliveryCounter = 0, size = 0, properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454775}, 
   readOnlyProperties = true, readOnlyBody = true, droppable = false, text = }] to integration Message payload []
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] preSend on channel 'cms.jms.archiveNodeChannel', 
   message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454775, , jms_redelivered=false, , , 
   id=22069a85-30c5-5478-1b6c-10b6264575b1, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:820, timestamp=1437721454778}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] ServiceActivator for 
   [org.springframework.integration.handler.MethodInvokingMessageProcessor@755cd5ee] received message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1, 
   , , jms_timestamp=1437721454775, , jms_redelivered=false, , , id=22069a85-30c5-5478-1b6c-10b6264575b1, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:820, 
   timestamp=1437721454778}]
ERROR  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] No file type was found for file 9789814316385.txt

Having attached a debugger I can see the reason for the exception that is thrown by the service activator not propagating:

The problem occurs in org.springframework.integration.dispatcher.UnicastingDispatcher

private boolean doDispatch(Message<?> message) {
    boolean success = false;
    Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
    if (!handlerIterator.hasNext()) {
        throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
    }
    List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
    while (success == false && handlerIterator.hasNext()) {
        MessageHandler handler = handlerIterator.next();
        try {
            handler.handleMessage(message);
            success = true; // we have a winner.
        }
        catch (Exception e) {
            RuntimeException runtimeException = (e instanceof RuntimeException)
                    ? (RuntimeException) e
                    : new MessageDeliveryException(message,
                            "Dispatcher failed to deliver Message.", e);
            if (e instanceof MessagingException &&
                    ((MessagingException) e).getFailedMessage() == null) {
                ((MessagingException) e).setFailedMessage(message);
            }
            exceptions.add(runtimeException);
            this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
        }
    }
    return success;
}

The doDispatch shown above loops over each MessageHander returned by Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message); breaking if one handles the message successfully.

The handlerIterator contains 2 MessageHandlers, the first is my Service Activator (which throws the RuntimeException) the second is org.springframework.integration.jms.JmsSendingMessageHandler (I am not sure why this is here)

When the Service Activator throws the exception it is passed to the handleExceptions method shown below:

/**
 * Handles Exceptions that occur while dispatching. If this dispatcher has
 * failover enabled, it will only throw an Exception when the handler list
 * is exhausted. The 'isLast' flag will be <em>true</em> if the
 * Exception occurred during the final iteration of the MessageHandlers.
 * If failover is disabled for this dispatcher, it will re-throw any
 * Exception immediately.
 */
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message, boolean isLast) {
    if (isLast || !this.failover) {
        if (allExceptions != null && allExceptions.size() == 1) {
            throw allExceptions.get(0);
        }
        throw new AggregateMessageDeliveryException(message,
                "All attempts to deliver Message to MessageHandlers failed.", allExceptions);
    }
}

Because the Service Activator is not the last handler (isLast = false) the exception is not thrown. The second JmsSendingMessageHandler handles the message (although I don't know why) and the message just keeps being redelivered. Does anyone know why this JmsSendingMessageHandler is also attempting to handle the message designated for the Service Activator?

1
Everything sounds good and correct from the premise perspective. So, share with us, please, the StackTrace when your service-activator throws that eception. Plus DEBUG logs for the JMS message. That must contain some redelivery count header. Otherwise your ActiveMQ jsut doesn't work properly!Artem Bilan
Hi Artem, thanks for replying. i have edited my post to include the DEBUG logs. The redelivery count does always equal 0. There is also no stack trace. Do you have any idea what might be causing this?Simon Hutchinson
Sorry, I don't see anything specific. Maybe it would be better when you share your config to represent in as test-case in our environment and will see what's going on in DEBUGArtem Bilan
Thanks again for your time Artem. I will get a test case together. In the meantime I have attached a debugger which shows (in part) why the exception thrown by my service activator is not propagating. I have updated my post with my findings. Can you offer any further ideas based on my findings?Simon Hutchinson
:-D. That's exactly an issue! You have the second subscriber to the same channel. Just make a seek throughout your configuration and try to find who else uses your cms.jms.archiveNodeChannel. In this case it must be <int-jms:outbound-channel-adapter>Artem Bilan

1 Answers

0
votes

The <publish-subscribe-channel> is for you! And you really can add all those component as subscribes to it and they will be called in the order how they are declared in the config or according their pre-configured order attribute.