I am new to jms. The goal is to process messages concurrently from a queue in an asynchronous listener's onMessage method by attaching a listener instance to multiple consumer's with each consumer using its own session and running in a separate thread, that way the messages are passed on to the different consumers for concurrent processing.
1) Is it possible to process messages concurrently from a single queue by creating multiple consumers ? 2) I came up with the below code, but would like to get your thoughts on whether the below code looks correct for what I want to accomplish.
public class QueueConsumer implements Runnable, MessageListener {
public static void main(String[] args) {
QueueConsumer consumer1 = new QueueConsumer();
QueueConsumer consumer2 = new QueueConsumer();
try {
consumer1.init("oms", "US.Q.CHECKOUT-ORDER.1.0.JSON");
consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON");
} catch (JMSException ex) {
ex.printStackTrace();
System.exit(-1);
}
Thread newThread1 = new Thread(consumer1);
Thread newThread2 = new Thread(consumer1);
newThread1.start();
newThread2.start();
}
private static String connectionFactoryName = null;
private static String queueName = null;
private static ConnectionFactory qcf = null;
private static Connection queueConnection = null;
private Session ses = null;
private Destination queue = null;
private MessageConsumer msgConsumer = null;
public static final Logger logger = LoggerFactory
.getLogger(QueueConsumer.class);
public QueueConsumer() {
super();
}
public void onMessage(Message msg) {
if (msg instanceof TextMessage) {
try {
//process message
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
public void run() {
try {
queueConnection.start();
} catch (JMSException e) {
e.printStackTrace();
System.exit(-1);
}
while (!Thread.currentThread().isInterrupted()) {
synchronized (this) {
try {
wait();
} catch (InterruptedException ex) {
break;
}
}
}
}
public void init(String factoryName, String queue2) throws JMSException {
try {
qcf = new JMSConnectionFactory(factoryName);
queueConnection = qcf.createConnection();
ses = queueConnection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
queue = ses.createQueue(queue2);
logger.info("Subscribing to destination: " + queue2);
msgConsumer = ses.createConsumer(queue);
msgConsumer.setMessageListener(this);
System.out.println("Listening on queue " + queue2);
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
private static void setConnectionFactoryName(String name) {
connectionFactoryName = name;
}
private static String getQueueName() {
return queueName;
}
private static void setQueueName(String name) {
queueName = name;
}
}