I have a spring boot app that uses Spring AMQP. The AMQP config is like this
@Configuration
public class AmqpConfig {
@Bean
DirectExchange directExchange() { return new DirectExchange("amq.direct"); }
@Bean
Queue testQueue() { return QueueBuilder.durable("test").build(); }
@Bean
Binding testBinding(Queue testQueue, DirectExchange directExchange) {
return BindingBuilder.bind(testQueue).to(directExchange).with("test.routing.key");
}
@Bean
SimpleRabbitListenerContainerFactory manualContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
configurer.configure(containerFactory, connectionFactory);
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return containerFactory;
}
}
So I'm using MANUAL acknowledgements. There's one listener
@Slf4j
@Component
public class ManualListener {
@RabbitListener(queues = "test", containerFactory = "manualContainerFactory")
public void processMsg(Message message, Channel channel, @Header(DELIVERY_TAG) long tag) throws IOException {
try {
log.info("Message received");
Thread.sleep(20000);
channel.basicAck(tag, false);
log.info("Message processed");
} catch (Exception e) {
log.error("Something went wrong: {}", message, e);
channel.basicNack(tag, false, false);
}
}
}
Thread.sleep(20000)
is here to simulate some time-consuming process. My test case is:
- Send a message to the listener above
- During the 20 seconds sleep, restart rabbitmq. This effectively terminates all channels on the queue
So, what I expected to happen in this case is channel.basicAck
to throw an exception that channel is closed, so that I can act accordingly (revert previous actions or similar). What actually happens is that basicAck
finishes like everything is ok and CachingConnectionFactory
just logs an exception in the background that PRECONDITION_FAILED for delivery.
2017-12-15 11:00:47.627 INFO 39397 --- [cTaskExecutor-1] .ManualListener : Message processed
2017-12-15 11:00:47.628 ERROR 39397 --- [ 127.0.0.1:5672] nnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
My question is: is there a reliably way for @RabbitListener
to know when a message is successfully acknowledged?
Spring-boot: 1.5.2.RELEASE, Spring-rabbit: 1.7.1.RELEASE
EDIT:
I tried solution with ((ChannelProxy) channel).getTargetChannel()
proposed by Gary Russell. It looks like it only mitigates the problem, there are still some messages that are falsely acked. I created this test, which I think it proves this.
@Slf4j
@Component
public class ManualListener {
static int counter = 0;
@RabbitListener(queues = "test", containerFactory = "manualContainerFactory")
public void processMsg(Message message, Channel channel, @Header(DELIVERY_TAG) long tag) throws IOException {
log.info("Message received with delivery tag {} and redelivered {}", message.getMessageProperties().getDeliveryTag(), message.getMessageProperties().getRedelivered());
if (!message.getMessageProperties().getRedelivered()) {
new Thread(() -> {
try {
channel.getConnection().close();
log.info("Connection closed");
} catch (Exception e) {
log.error("Connection closed with timeout", e);
}
}).start();
}
new Thread(() -> {
Channel actualChannel = ((ChannelProxy) channel).getTargetChannel();
try {
actualChannel.basicAck(tag, false);
log.info("Number of acknowledged messages: {}", ++counter);
} catch (Exception e) {
log.error("Something went wrong: {}", message, e);
}
}).start();
}
}
What I can see in the logs is:
2018-01-10 13:17:34.133 INFO 17250 --- [cTaskExecutor-1] .ManualListener : Message received with delivery tag 1 and redelivered false
2018-01-10 13:17:34.137 INFO 17250 --- [ Thread-27] .ManualListener : Number of acknowledged messages: 1
2018-01-10 13:17:34.163 INFO 17250 --- [ Thread-26] .ManualListener : Connection closed
2018-01-10 13:17:35.162 INFO 17250 --- [cTaskExecutor-2] .ManualListener : Message received with delivery tag 1 and redelivered true
2018-01-10 13:17:35.162 INFO 17250 --- [ Thread-28] .ManualListener : Number of acknowledged messages: 2
So in the test I acked the same message twice. What I would expect in this case to happen is to receive IOException
when basicAck
is called.
This problem occurs in our production environment, the app process large volumes of messages and acks quite often. Every time the connection is dropped, a couple of already acked messages is received.