Summary
I want to asynchronously handle messages from an AMQP/RabbitMQ queue. I have implemented a @RabbitListener
method (from spring-rabbit) for this but it seems that this listener is actually polling my queue under the hood. Is that to be expected? I would have expected the listener to somehow be notified by RabbitMQ instead of having to poll.
If it’s to be expected, can I somehow also consume messages asynchronously with Spring AMQP without polling?
What I Have Observed
When I send a message, it is correctly picked up by the listener. I still see a continuous stream of log messages which indicate that the listener continues to poll the empty queue:
…
15:41:10.543 [pool-1-thread-3] DEBUG o.s.a.r.l.BlockingQueueConsumer - ConsumeOK : Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.544 [main] DEBUG o.s.a.r.c.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2)
15:41:10.545 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [], routingKey = [myQueue]
Sent: Hello World
15:41:10.559 [pool-1-thread-4] DEBUG o.s.a.r.l.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:10.560 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Received message: (Body:'Hello World'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=myQueue, deliveryTag=1, messageCount=0])
15:41:10.571 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.a.MessagingMessageListenerAdapter - Processing [GenericMessage [payload=Hello World, headers={timestamp=1435844470571, id=018f39f6-ebca-aabf-7fe3-a095e959f65d, amqp_receivedRoutingKey=myQueue, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=myQueue, amqp_consumerTag=amq.ctag-bUsK4KQN6_QHzf8DoDC_ww, amqp_contentEncoding=UTF-8, contentType=text/plain, amqp_deliveryTag=1, amqp_redelivered=false}]]
Received: Hello World
15:41:10.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:11.579 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=MANUAL local queue size=0
15:41:12.583 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-bUsK4KQN6_QHzf8DoDC_ww=myQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=MANUAL local queue size=0
…
The last log message basically repeats infinitely every second.
My Test Code
The first two methods are probably the most interesting part; the rest is mainly Spring configuration:
@Configuration
@EnableRabbit
public class MyTest {
public static void main(String[] args) throws InterruptedException {
try (ConfigurableApplicationContext appCtxt =
new AnnotationConfigApplicationContext(MyTest.class)) {
// send a test message
RabbitTemplate template = appCtxt.getBean(RabbitTemplate.class);
Queue queue = appCtxt.getBean(Queue.class);
template.convertAndSend(queue.getName(), "Hello World");
System.out.println("Sent: Hello World");
// Now that the application with its message listeners is running,
// block this thread forever; make sure, though, that the
// application context can sanely be closed.
appCtxt.registerShutdownHook();
Object blockingObj = new Object();
synchronized (blockingObj) {
blockingObj.wait();
}
}
}
@RabbitListener(queues = "#{ @myQueue }")
private void processHello(@Payload String msg,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
throws IOException {
System.out.println("Received: " + msg);
channel.basicAck(deliveryTag, false);
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(rabbitConnFactory());
}
@Bean
public ConnectionFactory rabbitConnFactory() {
return new CachingConnectionFactory();
}
@Bean
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory result =
new SimpleRabbitListenerContainerFactory();
result.setConnectionFactory(rabbitConnFactory());
result.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return result;
}
@Bean
public Queue myQueue() {
return new Queue("myQueue", false);
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(rabbitConnFactory());
}
}