1
votes

I'm running a Java application that uses RabbitMQ Server 3.8.9, spring-amqp-2.2.10.RELEASE, and spring-rabbit-2.2.10.RELEASE.

My test case does something like the following:

  1. Start the RabbitMQ Server
  2. Start my Java application
  3. Test and validate some functionality on my Java application
  4. Gracefully stop my Java application
  5. Gracefully stop the RabbitMQ Server
  6. Repeat 1-6 a few more times

Everything looks fine except sometimes during one of the restarts about 10 minutes into it, I see the following error in my application's logs:

2021-02-05 12:52:46.498 UTC,ERROR,org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl,null,rabbitConnectionFactory23,runWorker():1149,Failed to invoke afterAckCallback
java.lang.NullPointerException: null
    at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.lambda$doHandleConfirm$1(PublisherCallbackChannelImpl.java:1027) ~[spring-rabbit.jar:2.2.10.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_181]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_181]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181]

Further analysis doesn't point to anything specific. There are no errors in the RabbitMQ log files, no restarts of the RabbitMQ server, nothing weird in the RabbitMQ logs during the time stamp above.

The code in question:

https://github.com/spring-projects/spring-amqp/blob/v2.2.10.RELEASE/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java#L1027

My tests are automated and run as part of a CI pipeline. The issue is intermittent and I have had trouble reproducing it locally in my sandbox.

From what I can tell, the functionality of my Java application is unaffected.

Code that creates the RabbitMQ connection factory used everywhere:

final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(HOST_NAME);
connectionFactory.setChannelCacheSize(1);
connectionFactory.setPublisherConfirms(true);

It seems like a concurrency problem, but I'm not so sure on how to get to the bottom of it. For the most part, we use the RabbitTemplate and other Spring facilities to connect to RabbitMQ.

Anyone in the Spring world with some knowledge in RabbitMQ care to chime in?

Thanks

1

1 Answers

0
votes

The code you talk about is like this:

finally {
    try {
        if (this.afterAckCallback != null && getPendingConfirmsCount() == 0) {
                this.afterAckCallback.accept(this);
                this.afterAckCallback = null;
            }
        }
        catch (Exception e) {
            this.logger.error("Failed to invoke afterAckCallback", e);
        }
}

There is really could be a race condition around that this.afterAckCallback property. We may pass if() in one but then different thread makes this.afterAckCallback as null, so we fail with that NPE. We have to copy its value to the local variable and then check and perform accept().

Feel free to raise a GitHub issue against Spring AMQP project: https://github.com/spring-projects/spring-amqp/issues

We have a race condition because we really call this doHandleConfirm() with its async logic from the loop in the processMultipleAck().