15
votes

I am testing the following scenario in Spring AMQP v1.4.2 and it fails to reconnect after a network disruption:

  1. Start the spring application which consumes messages asyncly using rabbit:listener-container and rabbit:connection-factory (detailed configuration follows).
  2. The log shows that application is successfully receiving messages.
  3. Make RabbitMQ invisible to the app by dropping inbound network traffic on rabbit server: sudo iptables -A INPUT -p tcp --destination-port 5672 -j DROP
  4. Wait for at least 3 minutes (for network connections to timeout).
  5. Fix the connection with: sudo iptables -D INPUT -p tcp --destination-port 5672 -j DROP
  6. Wait for some time (even tried more than an hour) and no reconnection happens.
  7. Restart the application and it starts receiving messages again, which means network was back to normal.

I have also tested the same scenario with VM network adapter disconnects instead of iptables drop and the same thing happens, i.e. no automatic reconnection. Interestingly enough, when I try iptables REJECT, instead of DROP, it works as expected and the app restarts as soon as I remove the reject rule but I think reject is more like a server failure than a network failure.

According to the reference document:

If a MessageListener fails because of a business exception, the exception is handled by the message listener container and then it goes back to listening for another message. If the failure is caused by a dropped connection (not a business exception), then the consumer that is collecting messages for the listener has to be cancelled and restarted. The SimpleMessageListenerContainer handles this seamlessly, and it leaves a log to say that the listener is being restarted. In fact it loops endlessly trying to restart the consumer, and only if the consumer is very badly behaved indeed will it give up. One side effect is that if the broker is down when the container starts, it will just keep trying until a connection can be established.

This is the log that I get about a minute after disconnection:

    2015-01-16 14:00:42,433 WARN  [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na]
    ... 1 common frames omitted

And I get this log message a few seconds after the reconnection:

2015-01-16 14:18:14,551 WARN  [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out

UPDATE: Quite strangely, when I enable DEBUG logging on org.springframework.amqp package, the reconnection happens successfully and I cannot reproduce the issue anymore!

Without debug logging enabled, I tried to debug the spring AMQP code. I observed that soon after iptables drop is removed, SimpleMessageListenerContainer.doStop() method is called which it turn calls shutdown() and cancels all of the channels. I also got this log message when I put a breakpoint on doStop() which seems to be related to the cause:

2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:49,283 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    ... 2 common frames omitted
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer

UPDATE 2: After setting requested-heartbeat to 30 seconds, as suggested in an answer, the reconnection worked most of the times and succeeded in redefining the exclusive temporary queue, bound to a fanout exchange, but it still fails to reconnect occasionally.

In the rare cases that it failed, I monitored RabbitMQ management console during the test and observed that a new connection was established (after the old connection was removed by timeout) but the exclusive temporary queue was not redefined after reconnection. Also the client was not receiving any messages. It is now really hard to reproduce the issue reliably as it happens less often. I have provided the full configuration below, now containing the queue declarations.

UPDATE 3: Even after replacing the exclusive temporary queue with an auto-delete named queue, the same behaviour occurs occasionally; i.e. the auto-delete named queue is not redefined after reconnection and no messages are being received until application is restarted.

I would really appreciate a lot if someone can help me on this.

Here is the spring AMQP configuration that I am relying on:

<!-- Create a temporary exclusive queue to subscribe to the control exchange -->
<rabbit:queue id="control-queue"/>

<!-- Bind the temporary queue to the control exchange -->
<rabbit:fanout-exchange name="control">
    <rabbit:bindings>
        <rabbit:binding queue="control-queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- Subscribe to the temporary queue -->
<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="none"
                           concurrency="1"
                           prefetch="1">
    <rabbit:listener queues="control-queue" ref="controlQueueConsumer"/>

</rabbit:listener-container>

<rabbit:connection-factory id="connection-factory"
                           username="${rabbit.username}"
                           password="${rabbit.password}"
                           host="${rabbit.host}"
                           virtual-host="${rabbit.virtualhost}"
                           publisher-confirms="true" 
                           channel-cache-size="100"
                           requested-heartbeat="30" />

<rabbit:admin id="admin" connection-factory="connection-factory"/>

<rabbit:queue id="qu0-id" name="qu0">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="dead-letter"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin">
    <rabbit:bindings>
        <rabbit:binding queue="qu0" pattern="p.0"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="manual"
                           concurrency="4"
                           prefetch="30">
    <rabbit:listener queues="qu0" ref="queueConsumerComponent"/>
</rabbit:listener-container>
3
Don't you mean that there is no such an issue with earliest Spring AMQP versions ?Artem Bilan
Would you mind sharing the logs for the org.springframework.amqp.rabbit.listener category in the DEBUG level to see more info on the matter? BTW I just have tried similar (or not ?) emilation with the tcpTrace on Windows and see similar Caused by: java.io.EOFException: null at java.io.DataInputStream.readUnsignedByte in logs. But when I restart trace the connection is restored. My AMQP Client is 3.4.2 - transitive dependency from Spring AMQP.Artem Bilan
Not specific to Spring AMQP, but you might try using Lyra instead if the ability to reconnect and recover resources like queues is something you're after.Jonathan

3 Answers

6
votes

I just ran your test as described (rabbit on linux using iptables to drop packets).

There is no log when the connection is reestablished (perhaps we should).

I suggest you turn on debug logging to see the reconnection.

EDIT:

From the rabbitmq documentation:

exclusive Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.

From your exception:

reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-

So the problem is the broker still thinks the other connection exists.

  1. Don't use an exclusive queue (you will lose messages with such a queue anyway). Or,
  2. Set a low requestedHeartbeat so the broker will detect the lost connection faster.
3
votes

We are also facing this issue in our production environment as well, might be because of Rabbit nodes running as VMs on different ESX racks etc. The workaround that we found was to have our client app continuously keep trying to reconnect if it gets disconnected from cluster. Below are the settings we applied and it worked:

<util:properties id="spring.amqp.global.properties">
  <prop key="smlc.missing.queues.fatal">false</prop>
</util:properties>

This attribute changes the global behavior of Spring AMQP when declaring queues fails for fatal errors (broker not available etc). By default container tries 3 times only (see log message showing "retries left=0").

Ref: http://docs.spring.io/spring-amqp/reference/htmlsingle/#containerAttributes

In addition, we added recovery-interval so that container recovers from non-fatal errors. However, the same config is also used when global behavior is to retry for fatal errors too (like missing queues).

<rabbit:listener-container recovery-interval="15000" connection-factory="consumerConnectionFactory">
....
</rabbit:listener-container>
1
votes

Set setRequestedHeartBeat to the ConnectionFactory and setMissingQueuesFatal(false) to SimpleMessageListenerContainer in order to retry to connect indefinitely. By default SimpleMessageListenerContainer setMissingQueuesFatal is set to true and only 3 retries will be done.

  @Bean
  public ConnectionFactory connectionFactory() {
    final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHost(), getPort());
    connectionFactory.setUsername(getUsername());
    connectionFactory.setPassword(getPassword());
    connectionFactory.setVirtualHost(getVirtualHost());
    connectionFactory.setRequestedHeartBeat(30);
    return connectionFactory;
  }

  @Bean
  public SimpleMessageListenerContainer listenerContainerCopernicusErrorQueue() {
    final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueNames(myQueue().getName());
    container.setMessageListener(messageListenerAdapterQueue());
    container.setDefaultRequeueRejected(false);
    container.setMissingQueuesFatal(false);
    return container;
  }