0
votes

I've got a project that uses Spring Integration to read messages off of a JMS Queue, process them and call a third party system with the results. If it fails, it then rolls the messages back on to the JMS Queue to be retried later.

This all works great. What I want to do now is to record something akin to access logs so that our monitoring tools can monitor throughput in a consistent manner to everything else.

I think what I want to do is have some code that wraps the entire JMS Transaction, allowing me to record a Start time, a Stop Time and whether the Transaction was a success or a failure. I can then write my access log messages based on that information and we can get it into our monitoring tools.

So - how do I do that? :)

My Spring Integration Channel Adapter looks like this:

<int-jms:message-driven-channel-adapter connection-factory="connectionFactory"
                         destination="syncQueue"
                         acknowledge="transacted"
                         channel="syncInputChannel"
                         concurrent-consumers="${jms.concurrentConsumers}" />

And my Connection Factory like this:

<bean id="IBMMQConnectionFactory" class="com.ibm.mq.jms.MQConnectionFactory" lazy-init="true">
    <property name="queueManager" value="${jms.queueManager}"/>
    <property name="hostName" value="${jms.hostName}"/>
    <property name="channel" value="${jms.channel}"/>
    <property name="port" value="${jms.port}"/>
    <property name="transportType" value="1"/>
</bean>

<bean id="syncQueue" class="com.ibm.mq.jms.MQQueue" lazy-init="true">
    <property name="baseQueueName" value="${jms.queueName}" />
</bean>

I can't see anything obvious I can attach to either the Connection Factory nor the Channel Adapter that will let me do what I want.

I did try a ChannelInterceptor, but this doesn't quite work. I get called on Start and on Success, but not on Failure.

Cheers

2

2 Answers

0
votes

I did try a ChannelInterceptor, but this doesn't quite work.

What version of Spring Integration are you using? Since 4.0.x, the ChannelInterceptor has additional methods, including

void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex);

If the exception is not null, it means the transaction will be rolled back.

You could also apply an AOP advice the the channel's send() methods.

0
votes

The transaction comes from the MessageListenerContainer and your ChannelInterceptor is called within. However if your downstream code throws an exception, the ChannelInterceptor still may help. The logic around it is covered in the AbstractMessageChannel:

public boolean send(Message<?> message, long timeout) {
       ...
        if (interceptors.getSize() > 0) {
            interceptorStack = new ArrayDeque<ChannelInterceptor>();
            message = interceptors.preSend(message, this, interceptorStack);
            if (message == null) {
                return false;
            }
        }
      ...
        sent = this.doSend(message, timeout);
      ...
        if (interceptorStack != null) {
            interceptors.postSend(message, this, sent);
            interceptors.afterSendCompletion(message, this, sent, null, interceptorStack);
        }
        return sent;
    }
    catch (Exception e) {
      ...
        if (interceptorStack != null) {
            interceptors.afterSendCompletion(message, this, sent, e, interceptorStack);
        }
        if (e instanceof MessagingException) {
            throw (MessagingException) e;
        }
        throw new MessageDeliveryException(message,
                "failed to send Message to channel '" + this.getComponentName() + "'", e);
    }
}

As you see the interceptors.afterSendCompletion() is called from the catch (Exception e) block as well. That's how you can get access to the rollback situation as well.

Unfortunately, there is no adviceChain option on the DefaultMessageListenerContainer like we have with the Spring AMQP one, so I can't suggest here you an alternative solution with the Advice around TX.