1
votes

I have the following configuration for consuming messages from a queue. I need to make sure that the task executor should perform only one task at a time and hence I have configured the task executor too as follows.

<bean name="jmsTaskExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="1" />
    <property name="maxPoolSize" value="2" />
</bean>

When I configure my task Executor as above, 10 messages are getting consumed at a time (there is a huge flow of messages in the queue) and the container stops listening messages for almost 10-15 minutes.My container is configured as follows:

<bean id="queueContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="cachedConnectionFactory" />
    <property name="destination" ref="queue" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="idleTaskExecutionLimit" value="1" />
    <property name="idleConsumerLimit" value="5" />
    <property name="receiveTimeout" value="10000" />
    <property name="recoveryInterval" value="10000" />
    <property name="taskExecutor" ref="jmsTaskExecutor" />
    <property name="messageListener" ref="queueListener" />
    <property name="autoStartup" value="true" />
</bean>

After doing a little bit of googling, i tried to use SyncTaskExecutor instead of ThreadPoolTaskExecutor and I have configured my taskExecutor as follows:

<bean name="jmsTaskExecutor"
            class="org.springframework.core.task.SyncTaskExecutor" />

but that is causing a memory leak in tomcat.

Can you please let me know how I can achieve the behavior of consuming messages and handling the message to the task only after the task is completed?

Queue Listener Code is as follows:

public class QueueListener implements SessionAwareMessageListener<Message>{
 @override
 public void onMessage(Message msg,Session ses) throws JMSException{
 ....
 ....
 ....
 }
}
1
can you post your queueListener code ? also post remaining xml config ? what is your prefetch size ?2020
I am not setting the prefetch size anywhere explicitly and I have posted my queueListener code. Can you please check now?Sai Kumar

1 Answers

0
votes

When I configure my task Executor as above, 10 messages are getting consumed at a time

That suggests that your prefetch size is 10. So, even if you have only one thread processing the messages, that thread takes in 10 messages at a time. If you start your broker with a prefetch size of 0 or 1 (depending on the broker implementation), you will get the "one message at a time" behavior that you desire.

For ex, you can look at this link on setting the prefetch size if you are using ActiveMQ.

If your message processing is taking time then we need to see more of your "onMessage" code to tell where you might be spending time.