I am testing the following scenario in Spring AMQP v1.4.2 and it fails to reconnect after a network disruption:
- Start the spring application which consumes messages asyncly using rabbit:listener-container and rabbit:connection-factory (detailed configuration follows).
- The log shows that application is successfully receiving messages.
- Make RabbitMQ invisible to the app by dropping inbound network traffic on rabbit server:
sudo iptables -A INPUT -p tcp --destination-port 5672 -j DROP
- Wait for at least 3 minutes (for network connections to timeout).
- Fix the connection with:
sudo iptables -D INPUT -p tcp --destination-port 5672 -j DROP
- Wait for some time (even tried more than an hour) and no reconnection happens.
- Restart the application and it starts receiving messages again, which means network was back to normal.
I have also tested the same scenario with VM network adapter disconnects instead of iptables drop and the same thing happens, i.e. no automatic reconnection. Interestingly enough, when I try iptables REJECT, instead of DROP, it works as expected and the app restarts as soon as I remove the reject rule but I think reject is more like a server failure than a network failure.
According to the reference document:
If a MessageListener fails because of a business exception, the exception is handled by the message listener container and then it goes back to listening for another message. If the failure is caused by a dropped connection (not a business exception), then the consumer that is collecting messages for the listener has to be cancelled and restarted. The SimpleMessageListenerContainer handles this seamlessly, and it leaves a log to say that the listener is being restarted. In fact it loops endlessly trying to restart the consumer, and only if the consumer is very badly behaved indeed will it give up. One side effect is that if the broker is down when the container starts, it will just keep trying until a connection can be established.
This is the log that I get about a minute after disconnection:
2015-01-16 14:00:42,433 WARN [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: java.io.EOFException: null
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na]
... 1 common frames omitted
And I get this log message a few seconds after the reconnection:
2015-01-16 14:18:14,551 WARN [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out
UPDATE: Quite strangely, when I enable DEBUG logging on org.springframework.amqp package, the reconnection happens successfully and I cannot reproduce the issue anymore!
Without debug logging enabled, I tried to debug the spring AMQP code. I observed that soon after iptables drop is removed, SimpleMessageListenerContainer.doStop()
method is called which it turn calls shutdown() and cancels all of the channels. I also got this log message when I put a breakpoint on doStop() which seems to be related to the cause:
2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:44,243 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:44,243 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:49,283 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
... 2 common frames omitted
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer
UPDATE 2: After setting requested-heartbeat
to 30 seconds, as suggested in an answer, the reconnection worked most of the times and succeeded in redefining the exclusive temporary queue, bound to a fanout exchange, but it still fails to reconnect occasionally.
In the rare cases that it failed, I monitored RabbitMQ management console during the test and observed that a new connection was established (after the old connection was removed by timeout) but the exclusive temporary queue was not redefined after reconnection. Also the client was not receiving any messages. It is now really hard to reproduce the issue reliably as it happens less often. I have provided the full configuration below, now containing the queue declarations.
UPDATE 3: Even after replacing the exclusive temporary queue with an auto-delete named queue, the same behaviour occurs occasionally; i.e. the auto-delete named queue is not redefined after reconnection and no messages are being received until application is restarted.
I would really appreciate a lot if someone can help me on this.
Here is the spring AMQP configuration that I am relying on:
<!-- Create a temporary exclusive queue to subscribe to the control exchange -->
<rabbit:queue id="control-queue"/>
<!-- Bind the temporary queue to the control exchange -->
<rabbit:fanout-exchange name="control">
<rabbit:bindings>
<rabbit:binding queue="control-queue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- Subscribe to the temporary queue -->
<rabbit:listener-container connection-factory="connection-factory"
acknowledge="none"
concurrency="1"
prefetch="1">
<rabbit:listener queues="control-queue" ref="controlQueueConsumer"/>
</rabbit:listener-container>
<rabbit:connection-factory id="connection-factory"
username="${rabbit.username}"
password="${rabbit.password}"
host="${rabbit.host}"
virtual-host="${rabbit.virtualhost}"
publisher-confirms="true"
channel-cache-size="100"
requested-heartbeat="30" />
<rabbit:admin id="admin" connection-factory="connection-factory"/>
<rabbit:queue id="qu0-id" name="qu0">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="dead-letter"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin">
<rabbit:bindings>
<rabbit:binding queue="qu0" pattern="p.0"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:listener-container connection-factory="connection-factory"
acknowledge="manual"
concurrency="4"
prefetch="30">
<rabbit:listener queues="qu0" ref="queueConsumerComponent"/>
</rabbit:listener-container>
org.springframework.amqp.rabbit.listener
category in theDEBUG
level to see more info on the matter? BTW I just have tried similar (or not ?) emilation with thetcpTrace
on Windows and see similarCaused by: java.io.EOFException: null at java.io.DataInputStream.readUnsignedByte
in logs. But when I restarttrace
the connection is restored. My AMQP Client is3.4.2
- transitive dependency from Spring AMQP. – Artem Bilan