1
votes

I'm using the AmqpTemplate.sendAndReceive func from several servers, it works and the reconnection works after rabbitmq restarts or network problems.

It worked great for several weeks but suddenly one of my servers is getting timeout for every sendAndReceive call, the rabbitmq is getting the message and it being processed but sendAndReceive don't get the response (the reply-timeout is set to 60 sec and it takes only few sec to process the message). Other servers worked at the same time with the same code and using the same queue.

The server returned to work only after I restarted the service on it.

I think it's some reconnection problem (even though the messages sent to rabbitmq successfully), maybe the AmqpTemplate response listener didn't reconnect or something.

Anyone knows what might be the problem? and how I can prevent it to happen again?

my ConnectionFactory setting:

setConnectionTimeout(1000);
setRequestedHeartbeat(100);
setTopologyRecoveryEnabled(true);
setAutomaticRecoveryEnabled(true);

Edit:

Spring-AMQP version: 1.4.3

<bean id="myConnectionFactory" class="path.to.myConnectionFactoryClass"></bean>

<rabbit:connection-factory id="myRabbitConnectionFactory" connection-factory="myConnectionFactory" channel-cache-size="25" />

<rabbit:template id="myTemplate" connection-factory="myRabbitConnectionFactory" reply-timeout="65000" />

Edit:

It happened again, I see that it stop working after I get the error:

org.springframework.amqp.AmqpConnectException: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:51)
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:110)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1051)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:894)
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)
at MyClass.onMessage(MyClass.java:1234)
at sun.reflect.GeneratedMethodAccessor3558.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy95.onMessage(Unknown Source)
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:237)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$100(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$2.doInTransaction(SimpleMessageListenerContainer.java:975)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$2.doInTransaction(SimpleMessageListenerContainer.java:968)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:968)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:174)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:496)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:96)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:42)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:747)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:736)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:416)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:392)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$500(CachingConnectionFactory.java:75)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:623)
at com.sun.proxy.$Proxy74.basicCancel(Unknown Source)
at org.springframework.amqp.rabbit.core.RabbitTemplate$7.doInRabbit(RabbitTemplate.java:944)
at org.springframework.amqp.rabbit.core.RabbitTemplate$7.doInRabbit(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1045)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:902)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:894)
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)

Not sure why the connection closed and why it didn't reconnect.

2
Please provide complete configuration and Spring AMQP version. Spring AMQP has its own connection recovery mechanism (which predates the rabbit-client version by a long time). Versions prior to 1.4 are not compatible with rabbitmq automaticRecoveryEnabled.Gary Russell
Thanks Gary for your comment, I'm using v1.4.3 and I edited the post with more config info.Ido Ganzer
It's a pretty simple setup; there's nothing to "recover" on the receiving side since it's using direct reply-to (or a temporary queue, depending on the rabbitmq version) for the replies; do you see anything useful in the logs? (this application and/or rabbitmq logs).Gary Russell
It happened again, I see that it stop working after I get the error: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFExceptionIdo Ganzer
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:110) at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1051)Ido Ganzer

2 Answers

0
votes

at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)

at MyClass.onMessage(MyClass.java:1234) ...

So this is not recovery of the server-side; your MyClass is invoking another rabbit template send and receive operation after receiving the message. So it is acting as a client in this situation.

However, normally, this exception would be thrown to the container, and rabbitmq will resubmit the message (unless you have acknowledge mode NONE), as long as your listener throws the exception to the container. If your listener catches the exception and does nothing with it except logging, you will see this result.

If you don't want rabbit to resubmit the failed message, or the inbound container does not acknowledge messages, you can configure a RetryTemplate into the outbound rabbit template to recover the connection; see the documentation.

If this doesn't explain your situation, you need to show the complete configuration, the code in MyClass.onMessage() and a complete log.

0
votes

I think it being solved. I didn't catch an exception in OnMesaage, and I believe it killed my listeners.