0
votes

I'm trying to create a network of embedded brokers, the topology I'm trying to achieve is as follows:

enter image description here

So I want to run Broker 1 which would be the initial receiver of all the messages and hold a topic %X%. Then I want to connect Broker 2 and Broker 3 to Broker 1 and make them listen to Broker 1 through a network connection. Eventually I want to allow consumers to receive messages from %X% topic connecting to Broker 2 and Broker 3.

So far I've written the following code:

Broker 1:

BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();

Broker 2:

BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61617");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();

Broker 3:

BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61618");
broker.addNetworkConnector("static:(tcp://localhost:61616)");
broker.start();

Producer:

public class Producer {

    private Connection connection;

    public Producer() throws JMSException {
        // Create a ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://localhost:61616");

        connection = connectionFactory.createConnection();
        connection.start();
        ....

    }

    public void produceMessage(int x) {
        try {
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic("Testtopic");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            String text = "Hello world " + x + "! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
            TextMessage message = session.createTextMessage(text);
            System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());

            producer.send(message);
            session.close();
        }
         ......
    }
}

Consumer:

public class Consumer {
    public Consumer() throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://localhost:61618"); // BROKER 3
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(session.createTopic("Testtopic"));
        consumer.setMessageListener(new HelloMessageListener());

    }

    private static class HelloMessageListener implements MessageListener {
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("Consumer " + Thread.currentThread().getName() + " received message: " + textMessage.getText());
              .......
        }

    }
}

However, when I connect to tcp://localhost:61618 (which is Broker 3) by consumer, I cannot receive any message. In the meantime if a connect directly to tcp://localhost:61616 (the initial receiver, Broker 1), consumer receives messages and everything goes well. I think I missed something in connectors configuration. Could you please help me with this?

Thanks,

Cheers

1
Do the networkBridges become active? Any log messages from ActiveMQ?Matt Pavlovich
@MattPavlovich it sends Exception in thread "main" javax.jms.JMSException: Wire format negotiation timeout: peer did not send his wire format.Andrey Yaskulsky

1 Answers

0
votes

The networkConnectors look to have the wrong port in the uri.

should be:

On broker1: .addNetworkConnector(tcp://localhost:61617) .addNetworkConnector(tcp://localhost:61618)

You don't need the network connectors from 2, 3 back to 1.

I also suggest configuring the full object and adding some parameters to bolster the config... duplex="false", networkTTL=1, etc.