0
votes

I have a RabbitMQ client application that listens to a specific queue. The client creates and instance of DefaultConsumer and implements the handleDelivery method. Here is the code

    protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

    public void receiveMessages() {
        try {
//            channel.basicQos(pollCount);
            Message message = new Message();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    String response = new String(body, "UTF-8");
                    if (response != null) {
                        message.setId(NUID.nextGlobal());
                        message.setPayload(response);
                        message.setDeliveryTag(deliveryTag);
                        messages.add(message);
                        logger.info("Message received: ", message.getPayload());
                    }
                }
            };
            logger.debug("**********Channel status: " + channel.isOpen());
            channel.basicConsume(queueName, false, consumer);
        } catch (Exception e) {
            logger.error("Exception while getting messages from Rabbit ", e);

        }
    }

The method receiveMessages() is called frequently through a thread every 500ms and drains the messages into a different List for consumption. Due to this poll on receiveMessages() I observed that the consumer tags are continuously getting created and growing when viewed through rabbit console like in the picture. Is it normal to see those increasing consumer tags? enter image description here

3

3 Answers

1
votes

Is it normal to see those increasing consumer tags?

No, your code has an error. You need to either just use a long-running consumer or you have to cancel your consumer when you are done with it.

I can't see any need to "poll" receiveMessages - just let it run on its own and it will add messages to your synchronized queue as you expect.


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

1
votes

I finally found a working solution. As Luke Bakken highlighted there is no polling required. I just call receiveMesssages() only once now. Thereafter my consumer is receiving callbacks as the messages are published into the queue.

 protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
 public void receiveMessages() {
    try {
        Message message = new Message();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            String response = new String(delivery.getBody(), "UTF-8");
            if (response != null) {
                message.setId(NUID.nextGlobal());
                message.setPayload(response);
                message.setDeliveryTag(deliveryTag);
                messages.add(message);
                logger.info("Message received: ", message.getPayload());
            };
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
    } catch (Exception e) {
        logger.error("Exception while getting messages from Rabbit ", e);
    }
}

The rabbit console now shows only 1 consume tag entry under the bound queue.

0
votes
public NotificationConsumerService(ConnectionFactory connectionFactory, String host, Logger logger) {
    this.connectionFactory = connectionFactory;
    this.host = host;
    this.logger = logger;
}

public void consumeSliceChangeNotification() {
    connectionFactory.setHost(this.host);
    try (Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            JSONObject obj = new JSONObject(message);
            String namespace = obj.getString("namespace");
            logger.info("The latest change notification on the " + namespace +" is available");
        };

        channel.basicConsume(QUEUE_NAME, true,deliverCallback, consumerTag -> { } );
    } 
    catch (IOException | TimeoutException e) {
        e.printStackTrace();
    }

}