2
votes

I am new to jms. The goal is to process messages concurrently from a queue in an asynchronous listener's onMessage method by attaching a listener instance to multiple consumer's with each consumer using its own session and running in a separate thread, that way the messages are passed on to the different consumers for concurrent processing.

1) Is it possible to process messages concurrently from a single queue by creating multiple consumers ? 2) I came up with the below code, but would like to get your thoughts on whether the below code looks correct for what I want to accomplish.

public class QueueConsumer implements Runnable, MessageListener {

public static void main(String[] args) {




    QueueConsumer consumer1 = new QueueConsumer();
    QueueConsumer consumer2 = new QueueConsumer();
    try {
        consumer1.init("oms", "US.Q.CHECKOUT-ORDER.1.0.JSON");
        consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON");
    } catch (JMSException ex) {
        ex.printStackTrace();
        System.exit(-1);
    }


    Thread newThread1 = new Thread(consumer1);
    Thread newThread2 = new Thread(consumer1);
    newThread1.start();
    newThread2.start();



}


private static String connectionFactoryName = null;
private static String queueName = null;


private static ConnectionFactory qcf = null;
private static Connection queueConnection = null;


private Session ses = null;
private Destination queue = null;
private MessageConsumer msgConsumer = null;

public static final Logger logger = LoggerFactory
        .getLogger(QueueConsumer.class);

public QueueConsumer() {
    super();
}

public void onMessage(Message msg) {
    if (msg instanceof TextMessage) {
        try {

            //process message

        } catch (JMSException ex) {
            ex.printStackTrace();

        }
    }

}

public void run() {

    try {
        queueConnection.start();
    } catch (JMSException e) {

        e.printStackTrace();

        System.exit(-1);
    }
    while (!Thread.currentThread().isInterrupted()) {
        synchronized (this) {
            try {
                wait();
            } catch (InterruptedException ex) {
                break;
            }
        }
    }

}



public void init(String factoryName, String queue2) throws JMSException {
    try {

        qcf = new JMSConnectionFactory(factoryName);


        queueConnection = qcf.createConnection();


        ses = queueConnection.createSession(false,
                Session.CLIENT_ACKNOWLEDGE);
        queue = ses.createQueue(queue2);
        logger.info("Subscribing to destination: " + queue2);

        msgConsumer = ses.createConsumer(queue);


        msgConsumer.setMessageListener(this);

        System.out.println("Listening on queue " + queue2);

    } catch (Exception e) {
        e.printStackTrace();
        System.exit(-1);
    }

}

private static void setConnectionFactoryName(String name) {
    connectionFactoryName = name;
}

private static String getQueueName() {
    return queueName;
}

private static void setQueueName(String name) {
    queueName = name;
}

}

3

3 Answers

2
votes
  1. Yes absolutely
  2. I only took a brief look and I noticed that you pass the wrong consumer to your second thread:

    Thread newThread2 = new Thread(consumer1); // has to pass consumer2
    

    beside of this, some variables such as ConnectionFactory are static and initialized multiple times/overriden. You only need one connection that could create multiple sessions and/or consumers.

0
votes

Related to the code example you provided, It is not recommanded by Oracle to create low-level threads on a deployed application. Example for Weblogic : Using Threads in WebLogic Server

0
votes

Instead in the applicationcontext.xml, where you have made the bean of mail container, you can add concurrent consumer property which would be a better approach.

<bean id="jmsMailContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers">
    <value>100</value>
</property>
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="mailDestination"/>
<property name="messageListener" ref="jmsMailConsumer"/>