This is my first post so hopefully I don't mess the formatting up too badly.
I have a spring integration route that routes a message as follows:
Gateway -> router -> outbound-jms-channel -> message-driven-channel -> service-activator
I'm using spring integration version 4.0.3 and activemq version 5.10.0. In this particular instance I'm running within a weblogic container (although this will not always be the case).
The route is posting a JMS message to an activemq queue. If I encounter a runtime exception during execution of the listener I expect rollback of any transactional resources enlisted within the listener to occur and for the message to be redelivered. Currently I am seeing rollback but no redelivery of the message.
My spring configuration is as follows:
<!-- TaskExecutor is defined elsewhere and is our own object, it's effectively the equivalent of calling Executors.newCachedThreadPool(). -->
<bean id="TransactionManager" class="org.springframework.transaction.jta.WebLogicJtaTransactionManager"/>
<bean id="XADataSource" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="${xaJndiName}"/>
</bean>
<amq:broker brokerName="messageBroker" useJmx="true" persistent="true" start="true" schedulerSupport="false">
<amq:transportConnectors>
<amq:transportConnector id="tcpConnector" uri="tcp://garethDell.leeds.retailexp.com:9783"/>
</amq:transportConnectors>
<amq:persistenceAdapter>
<amq:jdbcPersistenceAdapter dataSource="#XADataSource"/>
</amq:persistenceAdapter>
</amq:broker>
<amq:xaConnectionFactory id="amqConnectionFactory" brokerURL="tcp://garethDell.leeds.retailexp.com:9783" useAsyncSend="false">
<amq:prefetchPolicy>
<amq:prefetchPolicy all="1"/>
</amq:prefetchPolicy>
<amq:redeliveryPolicy>
<amq:redeliveryPolicy id="redeliveryPolicy" initialRedeliveryDelay="5000" maximumRedeliveries="5" useExponentialBackOff="true" backOffMultiplier="2" queue="*" />
</amq:redeliveryPolicy>
</amq:xaConnectionFactory>
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"/>
<property name="reconnectOnException" value="true"/>
</bean>
<int:gateway id="messageGateway" service-interface="com.retailexp.amp.server.spring.MessageGateway" default-request-channel="requestChannel"/>
<bean id="MessagePublisher" class="com.retailexp.amp.server.common.publishing.impl.MessagePublisher">
<property name="gateway" ref="messageGateway"/>
</bean>
<!-- Routes the messages to the appropriate destination, each queue / topic should have an appropriate channel associated with it which must be registered here-->
<int:channel id="requestChannel"/>
<int:header-value-router id="destinationRouter" input-channel="requestChannel" header-name="destination">
<int:mapping value="test" channel="testOutChannel"/>
</int:header-value-router>
<int:channel id="testOutChannel"/>
<int-jms:outbound-channel-adapter id="testOut" channel="testOutChannel" connection-factory="connectionFactory" destination="testQueue"/>
<int:channel id="testListener"/>
<int-jms:message-driven-channel-adapter id="testIn" connection-factory="connectionFactory" container="testQueueContainer" channel="testListener"/>
<!-- Define the queues -->
<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="test"/>
</bean>
<!-- Define the message listeners -->
<bean id="testMessageHandler" class="com.retailexp.amp.server.spring.TestListener"/>
<int:service-activator input-channel="testListener" ref="testMessageHandler" method="onMessage"/>
<bean id="testQueueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="transactionManager" ref="TransactionManager"/>
<property name="taskExecutor" ref="TaskExecutor"/>
<property name="maxConcurrentConsumers" value="10"/>
<property name="sessionTransacted" value="true"/>
<property name="sessionAcknowledgeMode" value="0"/>
<property name="destination" ref="testQueue"/>
<property name="cacheLevelName" value="CACHE_CONSUMER"/>
</bean>
My listener is very straight forward at this stage, it logs out the message then saves one of our domain objects and finally throws a runtime exception to force rollback. The rollback at this stage appears in the logs and the database changes are correctly rolled back so I know that within the listener I am getting transaction rollback. I assume that the consumer isn't enlisting as part of the XA transaction when it should be, so the consumption of the message is committed as part of a separate transaction (is somehow non-transactional?).
I am seeing an error setting the transaction isolation level due to the xa jdbc driver in TransactionContext, however I don't think this is the cause of the problem - I have tried to use various different isolation levels e.g. 8 TRANSACTION_SERIALIZABLE so I think that it's merely a limitation of the driver itself.
Exception:
[11 Sep 2014 16:53:07,464] [ActiveMQ Transport: tcp:///192.168.50.100:58292@9783 ] [ ] [ ] TRACE jdbc.TransactionContext - Cannot set transaction isolation to 1 due Due to vendor limitations, setting transaction isolation for "Oracle XA" JDBC XA driver is not supported.. This exception is ignored.
java.sql.SQLException: Due to vendor limitations, setting transaction isolation for "Oracle XA" JDBC XA driver is not supported.
at weblogic.jdbc.wrapper.JTAConnection.setTransactionIsolation(JTAConnection.java:492)
at org.apache.activemq.store.jdbc.TransactionContext.getConnection(TransactionContext.java:74)
at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doAddMessage(DefaultJDBCAdapter.java:220)
at org.apache.activemq.store.jdbc.JDBCMessageStore.addMessage(JDBCMessageStore.java:126)
at org.apache.activemq.store.memory.MemoryTransactionStore.addMessage(MemoryTransactionStore.java:343)
at org.apache.activemq.store.memory.MemoryTransactionStore$1.asyncAddQueueMessage(MemoryTransactionStore.java:159)
at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:910)
at org.apache.activemq.broker.region.Queue.send(Queue.java:733)
at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:424)
at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:445)
at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:147)
at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:307)
at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:152)
at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:496)
at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:756)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
at java.lang.Thread.run(Thread.java:722)
I have logs at trace level that are available if they're required, they're very verbose though so were pushing me over the character limit for this post. Let me know if they'd be helpful and I'll try to make them available.
Can anyone see if there is there anything obviously wrong with the above configuration?
Edit: Trace log available here: http://pastebin.com/0awmzY2D
Thanks
Oracle XA limitation message
. Anyway the last one says you that you can't do that. - Artem Bilan