I try to use Azure Service Bus with Apache Qpid and Spring Integration with transactions.
But It seems, that Azure Service Bus AMQP implementation doesn't support transactions. Is it true? I didn't find related info.
This is my JMS config
<bean id="serviceBusConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
<constructor-arg value="amqps://${serviceBus.host}?amqp.idleTimeout=1200000"/>
<property name="clientID" value="${serviceBus.clientId}"/>
<property name="username" value="${serviceBus.sharedAccessPolicyName}"/>
<property name="password" value="${serviceBus.sharedAccessPolicyKey}"/>
<property name="receiveLocalOnly" value="true"/>
<property name="receiveNoWaitLocalOnly" value="true"/>
</bean>
<bean id="jmsCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="serviceBusConnectionFactory" />
</bean>
And this is my spring integration snippet:
<int-jms:inbound-channel-adapter id="resOrdJmsIn"
destination-name="${serviceBus.destination-name}"
channel="resOrdIncoming"
connection-factory="jmsCachingConnectionFactory"
acknowledge="client"
session-transacted="true" >
<int:poller fixed-rate="1000"/>
</int-jms:inbound-channel-adapter>
It works with session-transacted="false" but with session-transacted="true" it produce error:
2017-07-03 10:06:27.237 ERROR 21575 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler : org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: An AMQP error occurred (condition='amqp:internal-error'). [condition = amqp:internal-error]
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487)
at org.springframework.jms.core.JmsTemplate.receiveSelected(JmsTemplate.java:754)
at org.springframework.integration.jms.JmsDestinationPollingSource.doReceiveJmsMessage(JmsDestinationPollingSource.java:138)
at org.springframework.integration.jms.JmsDestinationPollingSource.receive(JmsDestinationPollingSource.java:111)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.jms.JMSException: An AMQP error occurred (condition='amqp:internal-error'). [condition = amqp:internal-error]
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:148)
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:103)
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:167)
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:113)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:795)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:92)
at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699)
... 7 more
APACHE QPID Trace
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_FINAL
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: SESSION_INIT
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: SESSION_LOCAL_OPEN
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: SESSION_REMOTE_OPEN
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.apache.qpid.jms.provider.amqp.FRAMES : SENT: Attach{name='qpid-jms:coordinator:ID:fe87e12c-1fdc-4e7c-8cf1-11861b90de19:1:5', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Coordinator{capabilities=[amqp:local-transactions]}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.q.j.t.netty.NettyTcpTransport : Attempted write of: 127 bytes
2017-07-03 10:37:29.328 TRACE 24726 --- [ntLoopGroup-2-1] o.a.q.j.t.netty.NettyTcpTransport : New data read: 103 bytes incoming: UnpooledHeapByteBuf(ridx: 0, widx: 103, cap: 165)
2017-07-03 10:37:29.329 DEBUG 24726 --- [us.windows.net]] o.a.qpid.proton.engine.impl.SaslImpl : SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT] about to call plain input
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.apache.qpid.jms.provider.amqp.FRAMES : RECV: Attach{name='qpid-jms:coordinator:ID:fe87e12c-1fdc-4e7c-8cf1-11861b90de19:1:5', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=null, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=18446744073709551615, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_INIT
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_LOCAL_OPEN
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider : New Proton Event: LINK_REMOTE_OPEN
2017-07-03 10:37:29.432 TRACE 24726 --- [ntLoopGroup-2-1] o.a.q.j.t.netty.NettyTcpTransport : New data read: 103 bytes incoming: UnpooledHeapByteBuf(ridx: 0, widx: 103, cap: 165)
2017-07-03 10:37:29.433 DEBUG 24726 --- [us.windows.net]] o.a.qpid.proton.engine.impl.SaslImpl : SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT] about to call plain input