I'm trying to create a network of embedded brokers, the topology I'm trying to achieve is as follows:
So I want to run Broker 1 which would be the initial receiver of all the messages and hold a topic %X%. Then I want to connect Broker 2 and Broker 3 to Broker 1 and make them listen to Broker 1 through a network connection. Eventually I want to allow consumers to receive messages from %X% topic connecting to Broker 2 and Broker 3.
So far I've written the following code:
Broker 1:
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
Broker 2:
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61617");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
Broker 3:
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61618");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();
Producer:
public class Producer {
private Connection connection;
public Producer() throws JMSException {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
....
}
public void produceMessage(int x) {
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("Testtopic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
String text = "Hello world " + x + "! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);
System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);
session.close();
}
......
}
}
Consumer:
public class Consumer {
public Consumer() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61618"); // BROKER 3
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createTopic("Testtopic"));
consumer.setMessageListener(new HelloMessageListener());
}
private static class HelloMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("Consumer " + Thread.currentThread().getName() + " received message: " + textMessage.getText());
.......
}
}
}
However, when I connect to tcp://localhost:61618 (which is Broker 3) by consumer, I cannot receive any message. In the meantime if a connect directly to tcp://localhost:61616 (the initial receiver, Broker 1), consumer receives messages and everything goes well. I think I missed something in connectors configuration. Could you please help me with this?
Thanks,
Cheers