6
votes

I have configured Spring DefaultMessageListenerContainer as ActiveMQ consumer consuming messages from a queue. Let's call it "Test.Queue" I have this code deployed in 4 different machines and all the machines are configured to the same ActiveMQ instance to process the messages from the same "Test.Queue" queue.

I set the max consumer size to 20 as soon as all the 4 machines are up and running, I see the number of consumers count against the queue as 80 (4 * max consumer size = 80)

Everything is fine when the messages produced and sent to the queue grows high.

When there are 1000's of messages and among the 80 consumers, let's say one of them is stuck it puts a freeze on Active MQ to stop sending messages to other consumers.

All messages are stuck in ActiveMQ forever.

As I have 4 machines with up to 80 consumers , I have no clue as to see which consumer failed to acknowledge.

I go stop and restart all the 4 machines and when I stop the machine that has the bad consumer which got stuck, then messages starts flowing again.

I don't know how to configure DefaultMessageListenerContainer to abandon the bad consumer and signal ActiveMQ immediately to start sending messages.

I was able to create the scenario even without Spring as follows:

  1. I produced up to 5000 messages and sent them to the "Test.Queue" queue
  2. I created 2 consumers (Consumer A, B) and in one consumer B's onMessage() method, I put the thread to sleep for a long time ( Thread.sleep(Long.MAX_VALUE)) having the condition like when current time % 13 is 0 then put the thread to sleep.

  3. Ran these 2 consumers.

  4. Went to Active MQ and found that the queue has 2 consumers.
  5. Both A and B are processing messages
  6. At some point of time consumer B's onMessage() gets called and it puts the Thread to sleep when the condition of current time % 13 is 0 is satisified.
  7. The consumer B is stuck and it can't acknowledge to the broker
  8. I went back to Active MQ web console, still see the consumers as 2, but no messages are dequeued.
  9. Now I created another consumer C and ran it to consume.
  10. Only the consumer count in ActiveMQ went up to 3 from 2.
  11. But Consumer C is not consuming anything as the broker failed sending any messages holding them all as it is still waiting for consumer B to acknowledge it.
  12. Also I noticed Consumer A is not consuming anything
  13. I go and kill consumer B , now all messages are drained.

Let's say A, B, C are managed by Spring's DefaultMessageListenerContainer, how do I tweak Spring DefaultMessageListenerContainer to take that bad consumer off the pool (in my case consumer B) after it failed to acknowledge for X number of seconds, acknowledge the broker immediately so that the broker is not holding onto messages forever.

Thanks for your time.

Appreciate if I get a solution to this problem.

1
Maybe you should change the title of the question to indicate it is an activemq problemgkamal
Is there any specific configuration you have done for the queue or is it just the default configuration? For e.g., to ensure ordering of messagegkamal
@gkamal Do you think it is ActiveMQ problem? How do we get around the problem as it stops sending messages to other consumers when one of them is stuck and failed to acknowledge the broker? Is there a workaround for the problem?serverfaces
@gkamal <bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConnectionFactory" /> <property name="destination" ref="myDestination" /> <property name="messageListener" ref="messageConsumer" /> <property name="cacheLevelName" value="CACHE_CONNECTION" /> <property name="maxConcurrentConsumers" value="20" /> <property name="receiveTimeout" value="10000" /> <property name="maxMessagesPerTask" value="20" /> <property name="idleTaskExecutionLimit" value="5" /> </bean>serverfaces

1 Answers

3
votes

here are a few options to try...

  1. set the queue prefetch to 0 to promote better distribution across consumers and reduce 'stuck' messages on specific consumers. see http://activemq.apache.org/what-is-the-prefetch-limit-for.html

  2. set "?useKeepAlive=false&wireFormat.maxInactivityDuration=20000" on the connection to timeout the slow consumer after a specified inactive time

  3. set the queue policy "slowConsumerStrategy->abortSlowConsumer"...again to timeout a slow consumer

    <policyEntry ...
      ...
      <slowConsumerStrategy>
          <abortSlowConsumerStrategy />
      </slowConsumerStrategy>
      ...
    </policyEntry>