2
votes

As I know ActiveMQ has a feature called AUTO Acknowledge that actually inform the broker that message has been received (not acknowledging the producer).

I want to know if it is possible to send acknowledgement from consumer to producer in ActiveMQ or RabbitMQ. then I want to handle the acknowledgment message in producer and if it wouldn't receive acknowledge then sending the message again to the consumer.

2
It sounds like either you don't trust your MQ or you're looking for synchronous/blocking calls. If it's the latter, why not go with something that's designed that way, like HTTP?chrylis -cautiouslyoptimistic-

2 Answers

0
votes

You want to perform a synchronous usecase over an asynchronous medium.

In RabbitMQ's case you can use RPC, as described here - https://www.rabbitmq.com/tutorials/tutorial-six-python.html and https://www.rabbitmq.com/direct-reply-to.html

Please notice that even authors advise to avoid it:

When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.


RabbitMQ Java client provides auto-acking through com.rabbitmq.client.Channel.basicConsume.

0
votes

At least for ActiveMQ - this is built in. You have to turn it on in activemq.xml

<policyEntry queue=">" advisoryForConsumed="true"/>

Simply listen the advisory topic for the queue you want to monitor consumed messages for. Then you can extract message id:s and what not to "tick off" outstanding requests.

For a complete end-to-end acknowledgement, I recommend something more custom. I.e. your producer-app should listen to some "response" queue that receives responses about the status of the produced message. I.e. if processing failed - you may want to know why etc..

Anyway, here is some code with a producer that also listens to acknowledgements from ActiveMQ.

public void run() throws Exception {
    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
    conn = cf.createConnection();
    sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination dest = sess.createQueue("duck");
    MessageConsumer mc = sess.createConsumer(AdvisorySupport.getMessageConsumedAdvisoryTopic(dest));
    mc.setMessageListener(this);
    conn.start();

    MessageProducer mp = sess.createProducer(sess.createQueue("duck"));
    mp.send(sess.createTextMessage("quack"));
}


public void onMessage(Message msg) {
    try {
        String msgId = msg.getStringProperty("orignalMessageId");
        System.out.println("Msg: " + msgId + " consumed");
    } catch ( Exception e) {
        e.printStackTrace();
    }
}