0
votes

Background: I have a standard Producer consumer queue, the consumers are slow while producers are fast. Expectation is whenever a producer completes the requested message it acknowledges the message and Producer will assume the task associated with the message is done. Since Producers are fast I don't want threads Producing to wait, instead whenever the message is acknowledged a callback should be invoked. Since JMS is limited on this front and I have used the ActiveMQ Classes like ActiveMQMessageProducer directly - as much as possible.

Problem: The message are getting Auto acknowledged, the registered async callback is being invoke even if Consumer hasn't started yet. public void send(Destination destination, Message message, AsyncCallback onComplete)

Producer

public static boolean setup() {     
        Producer.connectionFactory = new 
        ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 
        // Create a Connection
        Producer.connection = 
            (ActiveMQConnection)connectionFactory.createConnection();
        connection.setAlwaysSessionAsync(true);
        connection.start();         
    }   

public Producer() {
        session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
        producer = (ActiveMQMessageProducer)session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        }
...

public void run() {
        long id  = messageID.getAndIncrement();         
        String text = "Hello world!"
        Message message = session.createTextMessage(text);
        producer.send(message, new MessageCompletion(id, this.messageRundown));
    }

Consumer

public static boolean setup() {     
    Consumer.connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");    
    Consumer.connection = (ActiveMQConnection)connectionFactory.createConnection();
    connection.setAlwaysSessionAsync(true);         
    return true;
}

public  Consumer() {
    session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
    consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
    consumer.setMessageListener(this);
    connection.start();
}

// implements MessageListener
@Override
public void onMessage(Message message) {    
    messageQueue.add(message);
}
public void run() {
    while(true) {
        Message message = messageQueue.poll();
        while(message != null) {
            // do some work             
            message.acknowledge();
            message = messageQueue.poll();              
        }
        Thread.sleep(10000);            
    }
}

Although consumer is not needed I have added it for reference, stuff has been removed to ensure brevity, this is part of working code.

1

1 Answers

1
votes

Your understanding of the way acknowledgement works is wrong. The async callback on the sender is only telling you that the broker has received the message. If it were a persistent send the callback would indicate that the message was written to disk as well.

There is no coupling of producer and consumer in a JMS or most other messaging brokers. The producer places a message on a queue and then the consumer can come along at any point and consume from that queue. There is no coupling of the two, a producer can't wait for a consumer before going on to produce the next message.

If you want to know when particular messages are processed so you can throttle work then you want to look into JMS Request / Response style messaging patterns.