1
votes

My app is a Spring based application. I'm using activemq as a broker. I manage in my app two different queues receiving messages.

For each queue, my app's aim is to listen to messages sent on the broker, then proceed the message (read it, performs database actions from informations coming to this message etc.), and then treat the next message (so the treatment is synchronous, I want to proceed messages in the order of their arrival).

My actual design is this one :

I create a thread

<bean id="pollThread" class="my.app.receiver" init-method="start" destroy-method="interrupt">
</bean>

run() method of the thread call in a while(true) loop a method which does :

  • This thread create a connection to active mq and block with receive
  • After receiving, i close the connection and treat the message (database stuff and so)
  • Treatment done, end of method

and then the treatment restart (listening with receive, treatment etc.)

My question is : is there any manner to design it better ? My only mandatory process is to proceed message in the order of arrival and do the treatment before handling the next message.

I've read a lot of thing about JMSTemplate and so on but i'm lost whith all the information.

Actually my best guess is too create a PooledConnectionFactory (because we will use only ActiveMQ and CachingConnectionFactory seems to have limitations that matters in our architecture) and to limit it to one concurrentConsumer. Then to use the MessageListener interface to proceed my message.

Thanks

1

1 Answers

-1
votes

You can use the Message Driven POJO infrastructure provided by the Spring framework. It will do the polling on the destination and recover in case of failures of the broker, etc.

You can change your code (say YourMessageListener) to implement MessageListener. Then the following config can get you started

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://www.springframework.org/schema/beans 
   http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="messageListener" class="org.foo.YourMessageListener"/>

    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://your-server:61616"/>
    </bean>

    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue/yourQueue"/>
    </bean>

    <bean id="jmsContainer" 
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="destination"/>
        <property name="concurrency" value="1"/>
        <property name="messageListener" ref="messageListener"/>
    </bean>
</beans>

The important setting is the concurrency element which limits the number of threads that are can simultaneously handle messages on that destination. By setting it to one, only one thread consumes a message at a time.

See the documentation for more details