I've defined a JMS Inbound Channel Adapter as following in Spring Integration 3.0.1.RELEASE:
<int-jms:inbound-channel-adapter channel="inChannel" phase="1000"
destination-name="jmsQueue" extract-payload="true"
connection-factory="connectionFactory">
<int:poller max-messages-per-poll="1" fixed-rate="1000"/>
</int-jms:inbound-channel-adapter>
But multiple messages are consumed from the JMS broker at random non-reliable distances, which can go from several seconds to several minutes between each message being consumed. I've tried with fixed-delay
instead of fixed-rate
but it has the same behavior.
Which other factor could be making the poll operations be performed at different times, and how could I achieve reliable polling times?
EDIT:
I've confined the application to a single default poller, with a single JMS Inbound Channel Adapter (although there are some Message Driven Channel Adapters), and it's still having the same behaviour. I've twitched the wait times to fixed-delay
of 3000 and a receive-timeout
of 5000.
I've started the application with some messages on the JMS queue to be picked up, and the logs show entries like this, switching the thread of the task-scheduler after some callback operations as shown below:
2016-09-23 18:48:25,592 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:18:1,started=true}
2016-09-23 18:48:25,630 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false'
2016-09-23 18:48:28,639 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:19:1,started=true}
2016-09-23 18:48:28,643 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false'
2016-09-23 18:48:31,651 | DEBUG | ask-scheduler-3 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:20:1,started=true}
2016-09-23 18:48:31,657 | DEBUG | ask-scheduler-3 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false'
2016-09-23 18:48:34,666 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:21:1,started=true}
2016-09-23 18:48:34,670 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false'
And then, 10 minutes later:
2016-09-23 18:58:10,032 | DEBUG | ask-scheduler-8 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:212:1,started=true}
2016-09-23 18:58:10,091 | DEBUG | ask-scheduler-8 | ion.endpoint.SourcePollingChannelAdapter | Poll resulted in Message:
And the message is consumed.
I've took multiple dumps, and could find only one instance of a task-executor thread on RUNNING state:
"task-scheduler-4" prio=6 tid=0x000000001074f800 nid=0x4364 runnable [0x000000001d4fe000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:167)
at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237)
- locked <0x00000007dc803080> (a java.util.concurrent.atomic.AtomicBoolean)
at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
- locked <0x00000007dc8031f8> (a java.lang.Object)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
at org.apache.activemq.ActiveMQSession.doClose(ActiveMQSession.java:590)
at org.apache.activemq.ActiveMQSession.close(ActiveMQSession.java:581)
at org.springframework.jms.support.JmsUtils.closeSession(JmsUtils.java:108)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:497)
at org.springframework.jms.core.JmsTemplate.receiveSelected(JmsTemplate.java:761)
at org.springframework.integration.jms.JmsDestinationPollingSource.doReceiveJmsMessage(JmsDestinationPollingSource.java:118)
at org.springframework.integration.jms.JmsDestinationPollingSource.receive(JmsDestinationPollingSource.java:93)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:111)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:184)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:141)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:273)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:268)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
All other thread dumps show that all threads for the task-scheduler are either on WAITING or TIMED_WAITING as the following (including the thread on the previous dump after it finished). This dumps from 30 seconds after the last one:
"task-scheduler-4" prio=6 tid=0x00000000118d3800 nid=0x4abc waiting on condition [0x000000000f8bf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007838d74d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"task-scheduler-3" prio=6 tid=0x00000000118d4800 nid=0x4f14 waiting on condition [0x000000001ba0f000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000787c10210> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
Any clue?