2
votes

I am new to ActiveMQ. I have tried to implement producer-consumer (sender-receiver) in activemq. In my code, I am easily send & receive the messages from single producer to single consumer via ActiveMQ. But, the problem is, I can't send the message to multiple consumers from the same producer.

Here is my producer & consumer class.

MsgProducer.java

package jms_service;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MsgProducer {

      private static String url = "failover://tcp://localhost:61616";
      public static javax.jms.ConnectionFactory connFactory;
      public static javax.jms.Connection connection;
      public static javax.jms.Session mqSession;
      public static javax.jms.Topic topic;
      public static javax.jms.MessageProducer producer;

      public static void main(String[] args) throws JMSException {

          connFactory = new ActiveMQConnectionFactory(url);  
          connection = connFactory.createConnection("system","manager");
          connection.start(); 
          mqSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);  

          topic = mqSession.createTopic("RealTimeData");
          producer = mqSession.createProducer(topic);                  
          producer.setTimeToLive(30000);

          TextMessage message = mqSession.createTextMessage();      

          int seq_id =1;

          while(true)
            {             
                message.setText("Hello world | " +"seq_id #"+seq_id);               
                 producer.send(message);
                 seq_id++;

                 System.out.println("sent_msg =>> "+ message.getText());
               //  if(seq_id>100000) break;

                    try {
                        Thread.sleep(1000);
                        } 
                    catch (InterruptedException e) { e.printStackTrace();}           
              }       

    }

}

MsgConsumer.java

package jms_service;

import java.text.SimpleDateFormat;
import java.util.Calendar;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MsgConsumer {

          private static String url = "failover://tcp://localhost:61616";     
          public static javax.jms.ConnectionFactory connFactory;
          public static javax.jms.Connection connection;
          public static javax.jms.Session mqSession;
          public static javax.jms.Topic topic;
          public static javax.jms.MessageConsumer consumer;

        public static void main(String[] args) throws JMSException, InterruptedException {

            connFactory = new ActiveMQConnectionFactory(url);
            connection = connFactory.createConnection("system", "manager");
            connection.setClientID("0002");
            //connection.start();               
            mqSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
            topic = mqSession.createTopic("RealTimeData");
            consumer = mqSession.createDurableSubscriber(topic, "SUBS01");
            connection.start();

            MessageListener listner = new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        if (message instanceof TextMessage) {
                            TextMessage txtmsg = (TextMessage) message;
                            Calendar cal = Calendar.getInstance();
                            //cal.getTime();
                            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
                            String time = sdf.format(cal.getTime());

                            String msg="received_message =>> "+ txtmsg.getText() + " | received_at :: "+time;
                            System.out.println(msg);

                            //consumer.sendData(msg);
                        }

                        } catch (JMSException e) {
                            System.out.println("Caught:" + e);
                            e.printStackTrace();
                            }
                    }
            };

            consumer.setMessageListener(listner);  

      }


}

Can anyone help to figure out the way for sending message to multiple consumers. Thanks in advance.

3
What's the problem exactly?Konstantin V. Salikhov
You have a hard-coded Client id ... I admit I don't know ActiveMQ, but I could imagine this to be a reason.Fildor
"If another connection with the same clientID is already running when this method is called, the JMS provider should detect the duplicate ID and throw an InvalidClientIDException."Fildor
It's not really clear what the problem is, what isn't working? What error do you get if any?Tim Bish

3 Answers

3
votes

Assuming your question is

Can anyone help to figure out the way for sending message to multiple consumers

and without reading through your complete code, an approach could be to put your clients in a collection

static Vector<consumer> vecConsumer;

where you put in every new client and give a reference to all existing clients. The broadcasting is just like sending to a single client, encapsulated in, for an example, a foreach loop

for(consumer cons : vecConsumer)
{
      //send stuff or put in sending queue
}
9
votes

Queue semantics deliver a message once-and-only-once across all consumers. This is per the JMS spec (a great read to understand the basics).

Topic semantics deliver a message to every consumer. So, a Topic may be the answer to your needs.

0
votes

Topics is the best route. One producer to many consumers or one publisher to many subscribers. With Queues you have go write a loop to to get all the possible consumers and use different destinations to send the messages. Your motive would also determine whether to use Queues or Topics.

  1. If u you think your consumers can be offline or have network issues then choose queues. In this case when they come back on they will receive the pending messages
  2. With topics there is no way they will receive the message when there is a disconnection unless u explicitly persist the message however new messages would overwrite the