2
votes

I have implemented ActiveMQ-Pub/Sub program using Java and Stomp.js. There is only one producer that is written in Java and consumers are written in js.

Here's the problem scenario,

  • Producer keep publishes messages to a topic.
  • Consumer A connects and subscribes the same topic.
  • Consumer B connects and subscribes the same topic.
  • Now A and B simultaneously listen to same topic, but the data that A receives are not same as what B receives, B skips some data.
  • When i disconnect A, B works fine.
  • When i disconnect B, A works fine.

Here's Producer code

public static Session SESSION;

/**
 *
 * @return @throws JMSException
 */
public static Session getSessionInstance() throws JMSException, IOException {
    if (null == SESSION) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(Context.getSystemProperties().getAmp().getUrl());
        Connection connection = connectionFactory.createConnection();
        connectionFactory.getPrefetchPolicy().setAll(1);
        connection.start();

        SESSION = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }
    return SESSION;
}

/**
 *
 * @param message
 * @throws JMSException
 */
public static void sendMessage(String topic, String message) throws JMSException, IOException {
    Session session = getSessionInstance();
    Destination destination = session.createQueue(topic);
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    TextMessage txtMessage = session.createTextMessage(message);
    producer.send(txtMessage);

    producer.close();
}

And consumer

var client = Stomp.client("ws://localhost:61614?consumer.prefetchSize=1", "v11.stomp");
client.debug = null;
var selectedVehicleImei = 741852963123456;
client.connect("", "", function (topic) {
    client.subscribe("COO." + selectedVehicleImei, function (message) {
        var infodata = JSON.parse(message.body);
        console.log(infodata);
    })
})

i've tried seting pre-fetch values as, in producer,

connectionFactory.getPrefetchPolicy().setAll(1);

and in consumer,

?consumer.prefetchSize=1

But still no luck, what's the problem here and can anyone tell me how can i make this work ?

1

1 Answers

1
votes

I think you need to use createTopic instead of CreateQueue. Even though you called the variable topic, it looks like it is a queue.

Queues are for tasks that only need to be answered by one consumer (like sending an email).