A - Question
I know there is a similar question but not the same in SO.
I'm trying to understand what goes on under the hood with MessageProducer and MessageConsumer in JMS. Using the implementation of ActiveMQ, I've written a simple MessageProducer example to send a message to queue, and a MessageConsumer example to consume the message from the queue, while running ActiveMQ locally.
Connection#start method is needed for sending a Message to Queue. The exact debug point is as follows. Connection#start triggers ActiveMQSession#start method. This method is triggered when a Connection#start is called. See the following debug point at org.apache.activemq.ActiveMQSession#start
;
The problem is, Connection#start is not explicitly needed on a MessageProducer but needed on a MessageConsumer. However, for both examples, we need to clear the resources (session and connection). What I realized is, if I remove Connection#start method on producer, the code will execute, debug point won't be triggered (not even under the hood) and I see the message in the queue. But if I remove Connection#start method on consumer, the code won't execute, that's the question, why it's not needed in MessageProducer and the code executes successfully but needed on MessageConsumer? Also why even we don't use Connection#start for MessageProducer even to the fact that we need to close the connection in order to flush the resources. It seems like code smells.
I see that field started is an AtomicBoolean
. I'm not an expert on concurrency and multi-threading, so, may be there is a logic someone can explain why for a MessageProducer, a Connection#start is not mandatory;
B - Example Code for JMS MessageProducer with ActiveMQ
package com.bzdgn.jms.stackoverflow;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSSendMessageToQueue {
private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
String queueName = "test_queue";
String messageContent = "Hello StackOverflow!";
// Connection Factory from ActiveMQ Implementation
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
// Get connection from Connection Factory
Connection connection = connectionFactory.createConnection();
// Create session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Send Message to Queue
Queue queue = session.createQueue(queueName);
TextMessage msg = session.createTextMessage(messageContent);
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.send(msg);
// Clear resources
session.close();
connection.close();
}
}
C - Example Code for JMS MessageConsumer with ActiveMQ
package com.bzdgn.jms.stackoverflow;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSConsumeMessageFromQueue {
private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
String queueName = "test_queue";
// Connection Factory from ActiveMQ Implementation
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
// Get connection from Connection Factory
Connection connection = connectionFactory.createConnection();
// Create session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Consume Message from the Queue
Queue queue = session.createQueue(queueName);
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
Message message = messageConsumer.receive(500);
if ( message != null ) {
if ( message instanceof TextMessage ) {
TextMessage textMessage = (TextMessage) message;
String messageContent = textMessage.getText();
System.out.println("Message Content: " + messageContent);
}
} else {
System.out.println("No message in the queue: " + queueName);
}
// Clear resources
session.close();
connection.close();
}
}
D - Configuration And Maven Dependency
JDK version is 1.8
, I'm running ActiveMQ 5.15.12
and also using the same version for the client dependency;
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.12</version>
</dependency>
Connection#start
is, in fact, not needed to send a message to a queue. – Justin Bertram