2
votes

I have a spring boot app that uses Spring AMQP. The AMQP config is like this

@Configuration
public class AmqpConfig {
  @Bean
  DirectExchange directExchange() { return new DirectExchange("amq.direct"); }

  @Bean
  Queue testQueue() { return QueueBuilder.durable("test").build(); }

  @Bean
  Binding testBinding(Queue testQueue, DirectExchange directExchange) {
    return BindingBuilder.bind(testQueue).to(directExchange).with("test.routing.key");
  }

  @Bean
  SimpleRabbitListenerContainerFactory manualContainerFactory(ConnectionFactory connectionFactory,
                                                                     SimpleRabbitListenerContainerFactoryConfigurer configurer) {
    SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(containerFactory, connectionFactory);
    containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return containerFactory;
  }
}

So I'm using MANUAL acknowledgements. There's one listener

@Slf4j
@Component
public class ManualListener {
  @RabbitListener(queues = "test", containerFactory = "manualContainerFactory")
  public void processMsg(Message message, Channel channel, @Header(DELIVERY_TAG) long tag) throws IOException {
    try {
      log.info("Message received");
      Thread.sleep(20000);
      channel.basicAck(tag, false);
      log.info("Message processed");
    } catch (Exception e) {
      log.error("Something went wrong: {}", message, e);
      channel.basicNack(tag, false, false);
    }
  }
}

Thread.sleep(20000) is here to simulate some time-consuming process. My test case is:

  1. Send a message to the listener above
  2. During the 20 seconds sleep, restart rabbitmq. This effectively terminates all channels on the queue

So, what I expected to happen in this case is channel.basicAck to throw an exception that channel is closed, so that I can act accordingly (revert previous actions or similar). What actually happens is that basicAck finishes like everything is ok and CachingConnectionFactory just logs an exception in the background that PRECONDITION_FAILED for delivery.

2017-12-15 11:00:47.627  INFO 39397 --- [cTaskExecutor-1] .ManualListener : Message processed
2017-12-15 11:00:47.628 ERROR 39397 --- [ 127.0.0.1:5672] nnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

My question is: is there a reliably way for @RabbitListener to know when a message is successfully acknowledged?

Spring-boot: 1.5.2.RELEASE, Spring-rabbit: 1.7.1.RELEASE

EDIT:

I tried solution with ((ChannelProxy) channel).getTargetChannel() proposed by Gary Russell. It looks like it only mitigates the problem, there are still some messages that are falsely acked. I created this test, which I think it proves this.

@Slf4j
@Component
public class ManualListener {
  static int counter = 0;

  @RabbitListener(queues = "test", containerFactory = "manualContainerFactory")
  public void processMsg(Message message, Channel channel, @Header(DELIVERY_TAG) long tag) throws IOException {
      log.info("Message received with delivery tag {} and redelivered {}", message.getMessageProperties().getDeliveryTag(), message.getMessageProperties().getRedelivered());
      if (!message.getMessageProperties().getRedelivered()) {
        new Thread(() -> {
          try {
            channel.getConnection().close();
            log.info("Connection closed");
          } catch (Exception e) {
            log.error("Connection closed with timeout", e);
          }
        }).start();
      }
      new Thread(() -> {
        Channel actualChannel = ((ChannelProxy) channel).getTargetChannel();
        try {
          actualChannel.basicAck(tag, false);
          log.info("Number of acknowledged messages: {}", ++counter);
        } catch (Exception e) {
          log.error("Something went wrong: {}", message, e);
        }
      }).start();
  }
}

What I can see in the logs is:

2018-01-10 13:17:34.133  INFO 17250 --- [cTaskExecutor-1] .ManualListener : Message received with delivery tag 1 and redelivered false
2018-01-10 13:17:34.137  INFO 17250 --- [      Thread-27] .ManualListener : Number of acknowledged messages: 1
2018-01-10 13:17:34.163  INFO 17250 --- [      Thread-26] .ManualListener : Connection closed
2018-01-10 13:17:35.162  INFO 17250 --- [cTaskExecutor-2] .ManualListener : Message received with delivery tag 1 and redelivered true
2018-01-10 13:17:35.162  INFO 17250 --- [      Thread-28] .ManualListener : Number of acknowledged messages: 2

So in the test I acked the same message twice. What I would expect in this case to happen is to receive IOException when basicAck is called.

This problem occurs in our production environment, the app process large volumes of messages and acks quite often. Every time the connection is dropped, a couple of already acked messages is received.

1

1 Answers

2
votes

The channel proxy recovers (refreshes) the underlying rabbitmq channel when it is detected closed.

You could call isOpen(), but there is still a small race condition between that call and your basicAck().

EDIT

You could also add a shut down listener...

channel.addShutdownListener(s -> {
    System.out.println(s);
});
channel.basicAck(tag, false);

EDIT

Here's a reliable work-around...

@RabbitListener(queues = "foo")
public void foo(Message m, @Header(AmqpHeaders.CHANNEL) Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws Exception {
    System.in.read();
    Channel actualChannel = ((ChannelProxy) channel).getTargetChannel();
    try {
        actualChannel.basicAck(tag, false);
    }
    catch (Exception e) {
        e.printStackTrace();
    }
}

and

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:253)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:422)
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:416)
    at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1164)
    at com.example.So47454769Application.foo(So47454769Application.java:42)