2
votes

We have an ActiveMQ / Camel configuration that has previously been using exclusively message queues, with concurrent consumers.

However, we're now introducing message topics, and finding that - because of the concurrent consumers - messages received in the topic are consumed mulltiple times.

What's the correct configuration for this scenario?

ie., we want multiple concurrent consumers for messages received on a queue, but only a single consumer defined for messages received on a topic.

Here's the current configuration:

<amq:connectionFactory id="amqConnectionFactory"
    useAsyncSend="true" brokerURL="${${ptl.Servername}.jms.cluster.uri}"
    userName="${jms.username}" password="${jms.password}" sendTimeout="1000"
    optimizeAcknowledge="true" disableTimeStampsByDefault="true">
</amq:connectionFactory>

<bean id="cachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
    <property name="cacheConsumers" value="true"></property>
    <property name="cacheProducers" value="true"></property>
    <property name="reconnectOnException" value="true"></property>
    <property name="sessionCacheSize" value="${jms.sessioncachesize}"></property>
</bean>

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="transacted" value="false" />
    <property name="concurrentConsumers" value="${jms.concurrentConsumer}" />
    <property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" />
    <property name="preserveMessageQos" value="true" />
    <property name="timeToLive" value="${jms.timeToLive}" />
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig" />
</bean>
2

2 Answers

3
votes

you can explicitly set concurrentConsumers/maxConcurrentConsumers to "1" for any topic consumers.

from("activemq:topic:myTopic?concurrentConsumers=1&maxConcurrentConsumers=1")...  

alternatively, set the JmsConfiguration concurrent/maxConcurrentConsumers properties to "1" and then explicitly enable concurrent consumption for queues as needed.

from("activemq:queue:myQueue?maxConcurrentConsumers=5")...  

also, you can use Virtual Topics to perform concurrent consumption of Topic messages without getting duplicates (highly recommended over traditional Topics)

0
votes

The solution I ended up using was to create a separate jmsConfig/activeMQ config block.

The total configration looks as follows:

 <!-- This is appropriate for consuming Queues, but not topics.  For topics, use
jmsTopicConfig / activemqTopics -->
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="transacted" value="false" />
    <property name="concurrentConsumers" value="${jms.concurrentConsumer}" />
    <property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" />
    <property name="preserveMessageQos" value="true" />
    <property name="timeToLive" value="${jms.timeToLive}" />
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig" />
</bean>

<!-- This config limits to a single concurrent consumer.  This config is appropriate for
consuming Topics, not Queues. -->
<bean id="jmsTopicConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="transacted" value="false" />
    <property name="concurrentConsumers" value="1" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="preserveMessageQos" value="true" />
    <property name="timeToLive" value="${jms.timeToLive}" />
</bean>

<bean id="activemqTopics" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsTopicConfig" />
</bean>

Then, in the camel pipeline, consuming the topic off the activemqTopics bean, as follows:

<camel:route id="myTopicResponder">
    <camel:from uri="activemqTopics:topic:stockQuotes?concurrentConsumers=1" />
    <camel:to uri="bean:stockQuoteResponder?method=saveStockQuote"/>
</camel:route>