0
votes

I have a standard activeMQ Broker

    private static String localVMurl = "vm://localhost";

    broker = new BrokerService();
    broker.addConnector(localVMurl); 
    broker.start();

and all is well. what my goal is that a consumer connects to the broker with a specific topic. Once this connection is detectd the broker will either pass along messages if a Producer is actively producing to the topic, OR the broker will start a new Producer for that specific topic. However to do this, i need to someone detect when a new consumer connects and requests a specific topic.

my basic consumer code:

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(remoterURL);
    Connection connection = connectionFactory.createConnection();
    connection.setClientID("clinet1");
    connection.start();

    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic("some_topic");
    MessageConsumer consumer = session.createConsumer(topic);
    consumer.setMessageListener(new MyMessageListener());

and I can see in the Broker logs:

  <161005 11:03:41> [.0.1:64433@5001] DEBUG tRegion - localhost adding destination: topic://ActiveMQ.Advisory.Consumer.Topic.some_topic

So i know the consumer connects, and subscribes to this topic, I just need to catch that event somehow.

Any thoughts as to how to do this ?

1

1 Answers

1
votes

Advisory Message is what you need. each time you got a message with this code this means you have new consumer starting or stopping.

doc http://activemq.apache.org/advisory-message.html

example:

    //org.apache.activemq.advisory.AdvisorySupport.getDestinationAdvisoryTopic(Destination)
Destination advisoryDestination = AdvisorySupport.getConsumerAdvisoryTopic(topic )
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);

public void onMessage(Message msg){
    if (msg instanceof ActiveMQMessage){
        try {
            ActiveMQMessage aMsg =  (ActiveMQMessage)msg;
            ConsumerInfo consumer = (ConsumerInfo) aMsg.getDataStructure();
        } catch (JMSException e) {
            log.error("Failed to process message: " + msg);
        }
    }
}