2
votes

Hi I am learning Spring JMS with ActiveMQ. In my example scenario is Producer application sends around 50 messages in queue and when I start Consumer application it starts to consume those messages.

Now I want multiple consumer threads to consume messages from queue. I am using JMS listener-container. When I googled for that I found there is a concurrency attribute.

According to Spring JMS doc concurrency attribute specifies

The number of concurrent sessions/consumers to start for each listener. Can either be a simple number indicating the maximum number (e.g. "5") or a range indicating the lower as well as the upper limit (e.g. "3-5"). Note that a specified minimum is just a hint and might be ignored at runtime. Default is 1; keep concurrency limited to 1 in case of a topic listener or if queue ordering is important; consider raising it for general queues.

But in my configuration I am setting this attribute to 5 but it seems it fails to start 5 concurrent listeners.

Configuration for listener:

consumer-applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"

    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/jms
    http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616" />

    <bean id="listener" class="com.jms.example.MyMessageListener"></bean>

    <jms:listener-container container-type="default" concurrency="5"
        connection-factory="connectionFactory">
        <jms:listener destination="MyQueue" ref="listener"
            method="onMessage"></jms:listener>
    </jms:listener-container>

</beans>

And If I used bean DefaultMessageListenerContainer instead of jms:listener-container with properties:

<bean id="msgListenerContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer"
        p:connectionFactory-ref="connectionFactory"
        p:destination-ref="destination"
        p:messageListener-ref="listener"
        p:concurrentConsumers="10"
        p:maxConcurrentConsumers="50" />

Then in ActiveMQ console I could see 10 consumers but in reality it starts 3 consumers simultaneously or sometimes 6 or only 1 consumer.

EDIT:

Consumer code:

public class MyMessageListener implements MessageListener{


    public void onMessage(Message m) {
        TextMessage message=(TextMessage)m;
        try{
            System.out.println("Start = " + message.getText());
            Thread.sleep(5000);
            System.out.println("End = " + message.getText());
        }catch (Exception e) {e.printStackTrace();  }
    }
}

I am printing consumed messages on console whose output is explained in scenarios below:

OBSERVATION:

I observed some weird behavior. My producer and consumer are two independent applications.

Scenario - 1:

  1. I start producer and send messages(Meanwhile consumer is NOT running)
  2. Then I start consumer to consume messages.

Here problem is it does not loads all 10 consumers. Sometimes it loads 3 OR 1.

Start = hello jms 1 // consumer 1 started 
Start = hello jms 2 // consumer 2 started 
Start = hello jms 3 // consumer 3 started 
End = hello jms 1  //  consumer 1 ended
Start = hello jms 4 // consumer 4 started and hence always 3 consumers and not 10
End = hello jms 2
Start = hello jms 5
End = hello jms 3
Start = hello jms 6

Scenario - 2:

  1. I start producer and send messages(Meanwhile consumer is running)
  2. Since the consumer is in running state it starts to consume them.

So it does load all 5 consumers properly as expected. so the output is:

Start = hello jms 1 // consumer 1 started 
Start = hello jms 2 // consumer 2 started 
Start = hello jms 3 // consumer 3 started 
Start = hello jms 4 // consumer 4 started 
Start = hello jms 5 // consumer 5 started 
Start = hello jms 6 // consumer 6 started 
Start = hello jms 7 // consumer 7 started 
Start = hello jms 8 // consumer 8 started 
Start = hello jms 9 // consumer 9 started 
Start = hello jms 10 // consumer 10 started. Hence all them started at same time as expected.
End = hello jms 1
Start = hello jms 11
End = hello jms 2
Start = hello jms 12
End = hello jms 3
Start = hello jms 13

Why is this happening. It is really eating my brain. I don't want to keep consumer to be running forever. I want to keep both detached.

Please help.

3
What do you mean by "in reality it starts 3 consumers simultaneously" ? If you can see 10 consumers in AMQ console, then you have 10 consumers. Try give it steady load over time and you will notice all 10 consumers will get load.Petter Nordlander
@PetterNordlander I will just update my question to bring in clarity.mahendra kawde
@PetterNordlander please see my edit. I mean to say in my consumer code i am printing received message text. So from output it looks like it has started 3 consumers simultaneously and not 10.mahendra kawde
@PetterNordlander please see my observations in the question above. I have updated my question.mahendra kawde
@mahendra if you specify only one number it means a "maximum" number of consumers will be started. Which mean always less than "maximum" is started unless Spring JMS thinks it needs more. Try to specify a range with a minimum value and give your consumer a steady load (not 50 messages). And you will see all consumers working up to the specified maximum.Strelok

3 Answers

1
votes

As Strelok pointed me about prefetching of messages. Created prefetchPolicy bean with queuePrefetch property set to 1. Whose reference is set in connectionFactory.

I did some changes in configuration, those are as below:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"

    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/jms
    http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"
        p:prefetchPolicy-ref="prefetchPolicy" />

    <bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy"
        p:queuePrefetch="1" />

    <bean id="listener" class="com.javatpoint.MyMessageListener"></bean>

    <jms:listener-container concurrency="10-15" connection-factory="connectionFactory">
        <jms:listener destination="javatpointQueue" ref="listener"
            method="onMessage"></jms:listener>
    </jms:listener-container>

    <!-- The JMS destination -->

      <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="javatpointQueue" />
      </bean>
</beans>
1
votes

Just met this problem on spring-boot 1.5.9 application.

As pointed out by @Strelok and @mahendra kawde, the issue is due to prefetchPolicy parameter. The default value is 1000.

Large prefetch values are recommended for high performance with high message volumes. However, for lower message volumes, where each message takes a long time to process, the prefetch should be set to 1. This ensures that a consumer is only processing one message at a time. Specifying a prefetch limit of zero, however, will cause the consumer to poll for messages, one at a time, instead of the message being pushed to the consumer.

Take a look at http://activemq.apache.org/what-is-the-prefetch-limit-for.html

One can change prefetchPolicy parameter as following:

  1. In application.properties file (working example)

    spring.activemq.broker-url=tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
    
  2. In DefaultMessageListenerContainer by modifying destinationName parameter (working example)

    <bean id="cons-even" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="destinationName" value="queue-name?consumer.prefetchSize=1"/>
      ...
    </bean>
    
  3. In ConnectionFactory bean (working example):

    @Bean
    public ConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
        ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
        policy.setQueuePrefetch(1);
        factory.setPrefetchPolicy(policy);
        return factory;
    }
    

Related topics:

  1. How do I make Spring JMSListener burst to max concurrent threads?
  2. Dynamic scaling of JMS consumer in spring boot
0
votes

JMS can work in concurrency mode. Below I am sharing the sample snippet concurrentConsumers = 100 value

Spring JMS Documentation

<bean id="listenerContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="concurrentConsumers">
            <value>100</value>
        </property>
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queue" />
        <property name="messageListener" ref="messageListener" />
        <property name="sessionTransacted" value="false" />
        <property name="sessionAcknowledgeMode" value="1" />
    </bean>