0
votes

I have a query on RabbitMq consumer acknowledgement, I read documentation on RabbitMq stating that acknowledging the message should be on the same channel from which consumer received. But I'm in a situation where due to some reason consumer process is stopped after I received message and haven't acknowledge the Rabbitmq, when the consumer process is restarted, consumer starts getting unacknowledged messages from RabbitMq, but here consumer cannot send the acknowledge to those messages as I get a channel exception stating the tag doesn't belongs to the channel. So, my question is how to handle this scenario and how can I acknowledge the rabbitmq to remove the message after my consumer process is completed reading the message?

1

1 Answers

1
votes

As you said acknowledgement must be sent on the same channel

Acknowledgement must be sent on the same channel the delivery it is for was received on. Attempts to acknowledge using a different channel will result in a channel-level protocol exception

The easy way to do that is to use autoack=true, so the message acknowledged automatically once it is consumed.

boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

EDIT

if auto_ack does not work for you, you could use channel_consumer.basicCancel(consumerTag);

something like that:

   final Consumer consumer = new DefaultConsumer(channel_consumer) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          String message = new String(body, "UTF-8");

          System.out.println(" [x] Received '" + message + "'");
          try {
            channel_consumer.basicCancel(consumerTag);
            System.out.println(" [x] stopping" + message + "'");

            try {
              Thread.sleep(10000);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
            System.out.println(" [x]  elaborated getting ack" + message + "'");
            channel_consumer.basicAck(envelope.getDeliveryTag(), false);

          } finally {
            System.out.println(" [x] Done");
          }
        }
      };
      boolean autoAck = false; // acknowledgment is covered below
      channel_consumer.basicConsume("test", autoAck, consumer);