0
votes

I've defined a JMS Inbound Channel Adapter as following in Spring Integration 3.0.1.RELEASE:

<int-jms:inbound-channel-adapter channel="inChannel" phase="1000"
                                 destination-name="jmsQueue" extract-payload="true"
                                 connection-factory="connectionFactory">
    <int:poller max-messages-per-poll="1" fixed-rate="1000"/>
</int-jms:inbound-channel-adapter>

But multiple messages are consumed from the JMS broker at random non-reliable distances, which can go from several seconds to several minutes between each message being consumed. I've tried with fixed-delay instead of fixed-rate but it has the same behavior.

Which other factor could be making the poll operations be performed at different times, and how could I achieve reliable polling times?

EDIT:

I've confined the application to a single default poller, with a single JMS Inbound Channel Adapter (although there are some Message Driven Channel Adapters), and it's still having the same behaviour. I've twitched the wait times to fixed-delay of 3000 and a receive-timeout of 5000.

I've started the application with some messages on the JMS queue to be picked up, and the logs show entries like this, switching the thread of the task-scheduler after some callback operations as shown below:

2016-09-23 18:48:25,592 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:18:1,started=true} 
2016-09-23 18:48:25,630 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 
2016-09-23 18:48:28,639 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:19:1,started=true} 
2016-09-23 18:48:28,643 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 
2016-09-23 18:48:31,651 | DEBUG | ask-scheduler-3 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:20:1,started=true} 
2016-09-23 18:48:31,657 | DEBUG | ask-scheduler-3 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 
2016-09-23 18:48:34,666 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:21:1,started=true} 
2016-09-23 18:48:34,670 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 

And then, 10 minutes later:

2016-09-23 18:58:10,032 | DEBUG | ask-scheduler-8 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:212:1,started=true}
2016-09-23 18:58:10,091 | DEBUG | ask-scheduler-8 | ion.endpoint.SourcePollingChannelAdapter | Poll resulted in Message:

And the message is consumed.

I've took multiple dumps, and could find only one instance of a task-executor thread on RUNNING state:

"task-scheduler-4" prio=6 tid=0x000000001074f800 nid=0x4364 runnable [0x000000001d4fe000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
    at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115)
    at java.io.DataOutputStream.flush(DataOutputStream.java:123)
    at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:167)
    at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237)
    - locked <0x00000007dc803080> (a java.util.concurrent.atomic.AtomicBoolean)
    at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
    at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
    at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
    - locked <0x00000007dc8031f8> (a java.lang.Object)
    at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
    at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
    at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
    at org.apache.activemq.ActiveMQSession.doClose(ActiveMQSession.java:590)
    at org.apache.activemq.ActiveMQSession.close(ActiveMQSession.java:581)
    at org.springframework.jms.support.JmsUtils.closeSession(JmsUtils.java:108)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:497)
    at org.springframework.jms.core.JmsTemplate.receiveSelected(JmsTemplate.java:761)
    at org.springframework.integration.jms.JmsDestinationPollingSource.doReceiveJmsMessage(JmsDestinationPollingSource.java:118)
    at org.springframework.integration.jms.JmsDestinationPollingSource.receive(JmsDestinationPollingSource.java:93)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:111)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:184)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:141)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:273)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:268)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

All other thread dumps show that all threads for the task-scheduler are either on WAITING or TIMED_WAITING as the following (including the thread on the previous dump after it finished). This dumps from 30 seconds after the last one:

"task-scheduler-4" prio=6 tid=0x00000000118d3800 nid=0x4abc waiting on condition [0x000000000f8bf000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007838d74d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

"task-scheduler-3" prio=6 tid=0x00000000118d4800 nid=0x4f14 waiting on condition [0x000000001ba0f000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000787c10210> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

Any clue?

1

1 Answers

0
votes

If you have a lot of pollers in your application, you could be suffering from thread starvation; the default task scheduler only has 10 threads; you can increase the count.

Pollers polling from QueueChannels block for 1 second by default.

Generally, turning on DEBUG logging for org.springframework.integration will help resolving issues such as this.

Taking a thread dump and looking at the task scheduler thread activity should also help.