1
votes

This is my first post so hopefully I don't mess the formatting up too badly.

I have a spring integration route that routes a message as follows:

Gateway -> router -> outbound-jms-channel -> message-driven-channel -> service-activator

I'm using spring integration version 4.0.3 and activemq version 5.10.0. In this particular instance I'm running within a weblogic container (although this will not always be the case).

The route is posting a JMS message to an activemq queue. If I encounter a runtime exception during execution of the listener I expect rollback of any transactional resources enlisted within the listener to occur and for the message to be redelivered. Currently I am seeing rollback but no redelivery of the message.

My spring configuration is as follows:

 <!-- TaskExecutor is defined elsewhere and is our own object, it's effectively the equivalent of calling Executors.newCachedThreadPool(). -->

<bean id="TransactionManager" class="org.springframework.transaction.jta.WebLogicJtaTransactionManager"/>

<bean id="XADataSource" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="${xaJndiName}"/>
</bean>

<amq:broker brokerName="messageBroker" useJmx="true" persistent="true" start="true" schedulerSupport="false">
    <amq:transportConnectors>
        <amq:transportConnector id="tcpConnector"  uri="tcp://garethDell.leeds.retailexp.com:9783"/>
    </amq:transportConnectors>
    <amq:persistenceAdapter>
        <amq:jdbcPersistenceAdapter dataSource="#XADataSource"/>
    </amq:persistenceAdapter>
</amq:broker>

<amq:xaConnectionFactory id="amqConnectionFactory" brokerURL="tcp://garethDell.leeds.retailexp.com:9783" useAsyncSend="false">
    <amq:prefetchPolicy>
        <amq:prefetchPolicy all="1"/>
    </amq:prefetchPolicy>
    <amq:redeliveryPolicy>
        <amq:redeliveryPolicy id="redeliveryPolicy" initialRedeliveryDelay="5000" maximumRedeliveries="5" useExponentialBackOff="true" backOffMultiplier="2" queue="*" />
    </amq:redeliveryPolicy>
</amq:xaConnectionFactory>

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
    <property name="reconnectOnException" value="true"/>
</bean>

<int:gateway id="messageGateway" service-interface="com.retailexp.amp.server.spring.MessageGateway" default-request-channel="requestChannel"/>
<bean id="MessagePublisher" class="com.retailexp.amp.server.common.publishing.impl.MessagePublisher">
    <property name="gateway" ref="messageGateway"/>
</bean>

<!-- Routes the messages to the appropriate destination, each queue / topic should have an appropriate channel associated with it which must be registered here-->
<int:channel id="requestChannel"/>
<int:header-value-router id="destinationRouter" input-channel="requestChannel" header-name="destination">
    <int:mapping value="test" channel="testOutChannel"/>
</int:header-value-router>

<int:channel id="testOutChannel"/>
<int-jms:outbound-channel-adapter id="testOut" channel="testOutChannel" connection-factory="connectionFactory" destination="testQueue"/>

<int:channel id="testListener"/>
<int-jms:message-driven-channel-adapter id="testIn" connection-factory="connectionFactory" container="testQueueContainer" channel="testListener"/>

<!-- Define the queues -->
<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test"/>
</bean>

<!-- Define the message listeners -->
<bean id="testMessageHandler" class="com.retailexp.amp.server.spring.TestListener"/>
<int:service-activator input-channel="testListener" ref="testMessageHandler" method="onMessage"/>

<bean id="testQueueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="transactionManager" ref="TransactionManager"/>
    <property name="taskExecutor" ref="TaskExecutor"/>
    <property name="maxConcurrentConsumers" value="10"/>
    <property name="sessionTransacted" value="true"/>
    <property name="sessionAcknowledgeMode" value="0"/>
    <property name="destination" ref="testQueue"/>
    <property name="cacheLevelName" value="CACHE_CONSUMER"/>
</bean>

My listener is very straight forward at this stage, it logs out the message then saves one of our domain objects and finally throws a runtime exception to force rollback. The rollback at this stage appears in the logs and the database changes are correctly rolled back so I know that within the listener I am getting transaction rollback. I assume that the consumer isn't enlisting as part of the XA transaction when it should be, so the consumption of the message is committed as part of a separate transaction (is somehow non-transactional?).

I am seeing an error setting the transaction isolation level due to the xa jdbc driver in TransactionContext, however I don't think this is the cause of the problem - I have tried to use various different isolation levels e.g. 8 TRANSACTION_SERIALIZABLE so I think that it's merely a limitation of the driver itself.

Exception:

[11 Sep 2014 16:53:07,464] [ActiveMQ Transport: tcp:///192.168.50.100:58292@9783  ] [            ] [            ] TRACE jdbc.TransactionContext                  - Cannot set transaction isolation to 1 due Due to vendor limitations, setting transaction isolation for "Oracle XA" JDBC XA driver is not supported.. This exception is ignored.
java.sql.SQLException: Due to vendor limitations, setting transaction isolation for "Oracle XA" JDBC XA driver is not supported.
    at weblogic.jdbc.wrapper.JTAConnection.setTransactionIsolation(JTAConnection.java:492)
    at org.apache.activemq.store.jdbc.TransactionContext.getConnection(TransactionContext.java:74)
    at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doAddMessage(DefaultJDBCAdapter.java:220)
    at org.apache.activemq.store.jdbc.JDBCMessageStore.addMessage(JDBCMessageStore.java:126)
    at org.apache.activemq.store.memory.MemoryTransactionStore.addMessage(MemoryTransactionStore.java:343)
    at org.apache.activemq.store.memory.MemoryTransactionStore$1.asyncAddQueueMessage(MemoryTransactionStore.java:159)
    at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:910)
    at org.apache.activemq.broker.region.Queue.send(Queue.java:733)
    at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:424)
    at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:445)
    at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
    at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
    at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
    at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:307)
    at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:152)
    at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:496)
    at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:756)
    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
    at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
    at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
    at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
    at java.lang.Thread.run(Thread.java:722)

I have logs at trace level that are available if they're required, they're very verbose though so were pushing me over the character limit for this post. Let me know if they'd be helpful and I'll try to make them available.

Can anyone see if there is there anything obviously wrong with the above configuration?

Edit: Trace log available here: http://pastebin.com/0awmzY2D

Thanks

1
Would be better, if you'd show the StackTrace on rollback TX instead of Oracle XA limitation message. Anyway the last one says you that you can't do that. - Artem Bilan
Thanks for the response, the trace isn't hugely helpful on the rollback, the transaction manger shows the rollback request then the initiate rollback no error stack or anything. The full trace log is available here: pastebin.com/0awmzY2D - Gareth Jenkins
Looks like something wrong with ActiveMQ: DEBUG listener.DefaultMessageListenerContainer - Rolling back transaction because of listener exception thrown: org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: Rollback please???!. Maybe messages aren't persistent ? - Artem Bilan
The messages seem to be persisted to the database correctly, if put a break point in to the consumer before the message is consumed I can query the ACTIVEMQ_MSGS table and see the message, although the XID is null at this stage. - Gareth Jenkins

1 Answers

1
votes

Turns out that there was no XID on the TransactionContext held by the session at the time the message hit the listener. Effectively the session wasn't being enlisted in the transaction so I was getting a new transaction that just wrapped the listener code.

I did some poking around as to where the enlistment should be occurring and it looks like the xa connection factory doesn't do anything to enlist the session with the transaction when it creates it, in retrospect I'm not sure why I thought it would as it had no reference to the transaction manager.

Changing to an XAPooledConnectionFactory results in the XaConnectionPool which does do automatic enlistment, configuration changed as follows:

<bean id="connectionFactory" class="org.apache.activemq.jms.pool.XaPooledConnectionFactory">
    <property name="connectionFactory" ref="xaConnectionFactory"/>
    <property name="tmJndiName" value="weblogic.transaction.TransactionManager"/>
    <property name="tmFromJndi" value="true"/>
</bean>

<bean id="testQueueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="transactionManager" ref="TransactionManager"/>
    <property name="taskExecutor" ref="TaskExecutor"/>
    <property name="maxConcurrentConsumers" value="10"/>
    <property name="sessionTransacted" value="true"/>
    <property name="destination" ref="testQueue"/>
</bean>

I'm now getting message redelivery. I've still got a problem where the delivery delay and back off policy aren't being adhered to (although strangely the maximumRedeliveries is), but I'm sure I can work that out.

Thanks for your time.