3
votes

Summary

I want to asynchronously handle messages from an AMQP/RabbitMQ queue. I have implemented a @RabbitListener method (from spring-rabbit) for this but it seems that this listener is actually polling my queue under the hood. Is that to be expected? I would have expected the listener to somehow be notified by RabbitMQ instead of having to poll.

If it’s to be expected, can I somehow also consume messages asynchronously with Spring AMQP without polling?

What I Have Observed

When I send a message, it is correctly picked up by the listener. I still see a continuous stream of log messages which indicate that the listener continues to poll the empty queue:

…
15:41:10.543 [pool-1-thread-3] DEBUG o.s.a.r.l.BlockingQueueConsumer - ConsumeOK : Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.544 [main] DEBUG o.s.a.r.c.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [], routingKey = [myQueue]
Sent: Hello World
15:41:10.559 [pool-1-thread-4] DEBUG o.s.a.r.l.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.560 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Received message: (Body:'Hello World'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=myQueue, deliveryTag=1, messageCount=0])
15:41:10.571 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=Hello World, headers={timestamp=1435844470571, id=018f39f6-ebca-aabf-7fe3-a095e959f65d, amqp_receivedRoutingKey=myQueue, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=myQueue, amqp_consumerTag=amq.ctag-bUsK4KQN6_QHzf8DoDC_ww, amqp_contentEncoding=UTF-8, contentType=text/plain, amqp_deliveryTag=1, amqp_redelivered=false}]]
Received: Hello World
15:41:10.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:11.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:12.583 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=MANUAL local queue size=0
…

The last log message basically repeats infinitely every second.

My Test Code

The first two methods are probably the most interesting part; the rest is mainly Spring configuration:

@Configuration
@EnableRabbit
public class MyTest {

    public static void main(String[] args) throws InterruptedException {
        try (ConfigurableApplicationContext appCtxt =
                new AnnotationConfigApplicationContext(MyTest.class)) {
            // send a test message
            RabbitTemplate template = appCtxt.getBean(RabbitTemplate.class);
            Queue queue = appCtxt.getBean(Queue.class);
            template.convertAndSend(queue.getName(), "Hello World");
            System.out.println("Sent: Hello World");

            // Now that the application with its message listeners is running,
            // block this thread forever; make sure, though, that the
            // application context can sanely be closed.
            appCtxt.registerShutdownHook();
            Object blockingObj = new Object();
            synchronized (blockingObj) {
                blockingObj.wait();
            }
        }
    }

    @RabbitListener(queues = "#{ @myQueue }")
    private void processHello(@Payload String msg,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
            throws IOException {
        System.out.println("Received: " + msg);
        channel.basicAck(deliveryTag, false);
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(rabbitConnFactory());
    }

    @Bean
    public ConnectionFactory rabbitConnFactory() {
        return new CachingConnectionFactory();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory
            rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory result =
                new SimpleRabbitListenerContainerFactory();
        result.setConnectionFactory(rabbitConnFactory());
        result.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return result;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myQueue", false);
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(rabbitConnFactory());
    }
}
1

1 Answers

5
votes

It's not polling rabbitmq; when a message arrives asynchronously from rabbit, it is placed in an internal queue in the consumer; handing over the message to the listener thread which is blocked, waiting for the arrival.

The DEBUG message you are seeing is after the listener thread times out waiting for a new message to arrive from rabbitmq.

You can increase the receiveTimeout to reduce the logs, or simply disable DEBUG logging for the BlockingQueueConsumer.

Increasing the timeout will make the container less responsive to container stop() requests.

EDIT:

In response to your comment below...

Yes, we could interrupt the thread but it's a bit more involved than that. The receive timeout is also used to ack messages when txSize is > 1.

Let's say you only want to ack every 20 messages (instead of every message). People do that to improve performance in high volume environments. The timeout is also used to ack (the txSize is actually every n messages or timeouts).

Now, let's say 19 messages arrive then none for 60 seconds and your timeout is 30 seconds.

This will mean that the 19 messages are un-acked for a long time. With the default configuration, the ack will be sent 1 second after the 19th message arrives.

There really is little overhead in this timeout (we simply loop back and wait again), so it is unusual for it to be increased.

Also, while the container is stopped when the context is closed, people stop and start containers all the time.