I have an AMQP Project with Spring AMQP. The RabbitMQ Server is not mine, so I have no control over it. When my application starts, it makes a private response queue like this:
@Bean(name="myAnonymousResponseQueue")
public Queue myAnonymousResponseQueue()
{
Queue q = myAmqpAdmin().declareQueue();
return q;
}
And I have a SimpleMessageListenerContariner like this:
@Bean
public SimpleMessageListenerContainer myResponseMessageListenerContainer()
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(myConnectionFactory());
container.setQueues(myAnonymousResponseQueue());
container.setMessageListener(myRabbitTemplate());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageConverter(myMessageConverter());
container.setErrorHandler(myResponseErrorHandler());
container.setAutoStartup(true);
container.setRabbitAdmin(myAmqpAdmin());
return container;
}
I've had connection problems lately (ShutdownSignalException). And the problem is that I can not regenerate the private queue. First, this is the connection error:
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:739)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:729)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:573)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at sun.security.ssl.InputRecord.readFully(Unknown Source)
at sun.security.ssl.InputRecord.read(Unknown Source)
at sun.security.ssl.SSLSocketImpl.readRecord(Unknown Source)
at sun.security.ssl.SSLSocketImpl.readDataRecord(Unknown Source)
at sun.security.ssl.AppInputStream.read(Unknown Source)
at java.io.BufferedInputStream.fill(Unknown Source)
at java.io.BufferedInputStream.read(Unknown Source)
at java.io.DataInputStream.readUnsignedByte(Unknown Source)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:538)
... 1 more
But when the connection has been restored, the private queue can not be recreated:
AbstractConnectionFactory.java|291||Created new connection: SimpleConnection@289cc201 [delegate=amqp://[email protected]:50310/sob]
RabbitAdmin.java|442||Auto-declaring a non-durable, auto-delete, or exclusive Queue (amq.gen-SLigrYFVMvllGTS5m_3AzQ) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-SLigrYFVMvllGTS5m_3AzQ' in vhost 'sob', class-id=50, method-id=10)
BlockingQueueConsumer.java|565||Failed to declare queue:amq.gen-SLigrYFVMvllGTS5m_3AzQ
BlockingQueueConsumer.java|479||Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[amq.gen-SLigrYFVMvllGTS5m_3AzQ]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1171)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.queueDeclarePassive(PublisherCallbackChannelImpl.java:383)
at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:835)
at com.sun.proxy.$Proxy94.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550)
... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-SLigrYFVMvllGTS5m_3AzQ' in vhost 'sob', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-SLigrYFVMvllGTS5m_3AzQ' in vhost 'sob', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:556)
... 1 more
And at the end I have this:
com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:554)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:509)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:503)
at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.close(PublisherCallbackChannelImpl.java:642)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler$1.run(CachingConnectionFactory.java:946)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
So the AMQP connection is OK, but the private queue isn’t.
I do not understand why this happens. How can I obtain more information? Is it possible to recover from this error?
Thanks
---------------- (28/11/2016) -----------
The reason I have the anonymous queue defined this way, is that with:
@Bean(name="myAnonymousResponseQueue")
public Queue myAnonymousResponseQueue()
{
return new AnonymousQueue(new Base64UrlNamingStrategy("amq.gen-"));
}
The server does not allow me to create it and I get the following error:
|28-11-2016 08:43:13.203|INFO |org.springframework.amqp.rabbit.core.RabbitAdmin|initialize|RabbitAdmin.java|493||Auto-declaring a non-durable, auto-delete, or exclusive Queue (amq.gen-JUYnhLX2SpqNoJT6ioB8GA) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-JUYnhLX2SpqNoJT6ioB8GA' in vhost 'sob', class-id=50, method-id=10)
|28-11-2016 08:43:13.486|WARN |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|attemptPassiveDeclarations|BlockingQueueConsumer.java|581||Failed to declare queue:amq.gen-JUYnhLX2SpqNoJT6ioB8GA
|28-11-2016 08:43:13.486|WARN |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|start|BlockingQueueConsumer.java|495||Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[amq.gen-JUYnhLX2SpqNoJT6ioB8GA]
|28-11-2016 08:43:28.868|WARN |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|attemptPassiveDeclarations|BlockingQueueConsumer.java|581||Failed to declare queue:amq.gen-JUYnhLX2SpqNoJT6ioB8GA
|28-11-2016 08:43:28.869|ERROR|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|run|SimpleMessageListenerContainer.java|1372||Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
So, what is de difference between AmqpAdmin().declareQueue() and AnonymousQueue? Is it posible that the broker does not permit name the queue?
Now I think I understand the problema. I think my user can only create queues named “amq.gen-”. If I try with any other name I get:
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue '4788a39b-fffe-4eae-b252-8d842234a018' in vhost 'sob' refused for user 'USER', class-id=50, method-id=10)
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq-_zoOEt5jTcqMduGWNyJ4Zg' in vhost 'sob' refused for user 'USER', class-id=50, method-id=10)
So, If I can use only broker generates queues, and I need to redeclare it in a reconection, what can i do?
Thank you again.
EDIT
I'm trying to apply the work-around. I have declared a ConnectionListener with:
@Override
public void onCreate(Connection arg0)
{
myResponseMessageListenerContainer.stop();
String[] colaAnterior = myResponseMessageListenerContainer.getQueueNames();
Queue q = myAmqpAdmin.declareQueue();
q.setAdminsThatShouldDeclare(myAmqpAdmin);
q.setShouldDeclare(true);
myResponseMessageListenerContainer.addQueueNames(q.getName());
myResponseMessageListenerContainer.removeQueueNames(colaAnterior);
myResponseMessageListenerContainer.initialize();
myResponseMessageListenerContainer.start();
}
But now I have this error:
|30-11-2016 10:56:17.312|ERROR|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|redeclareElementsIfNecessary|SimpleMessageListenerContainer.java|1116||Failed to check/redeclare auto-delete queue(s).
org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalStateException: Listener expects us to be listening on '[amq.gen-Xg5MG5n42ecpoW4-DA198A]'; our queues: [amq.gen-2qRLgfUmMxskshFCi1dzuA]
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:80)
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:553)
at es.omie.amqp.config.listener.XBIDConnectionListener.onCreate(XBIDConnectionListener.java:57)
at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:553)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:500)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:474)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:467)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:97)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1084)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1394)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1370)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1346)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:335)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:1102)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1278)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: Listener expects us to be listening on '[amq.gen-Xg5MG5n42ecpoW4-DA198A]'; our queues: [amq.gen-2qRLgfUmMxskshFCi1dzuA]
at org.springframework.util.Assert.state(Assert.java:392)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:770)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:550)
... 16 more
I keep trying, but any ideas on how to fix it?
Thanks
EDIT 2
Now I have this:
@Override
public void onCreate(Connection arg0)
{
myResponseMessageListenerContainer.stop();
String[] colaAnterior = myResponseMessageListenerContainer.getQueueNames();
Queue q = myAmqpAdmin.declareQueue();
log.info(" ------ RESPONSE QUEUE -> OLD NAME: " + Arrays.asList(colaAnterior) + " NEW NAME: " + q.getName());
myResponseMessageListenerContainer.addQueueNames(q.getName());
myRabbitTemplate.setReplyAddress(q.getName());
myRabbitTemplate.setQueue(q.getName());
myResponseMessageListenerContainer.removeQueueNames(colaAnterior);
myResponseMessageListenerContainer.shutdown();
myResponseMessageListenerContainer.initialize();
myResponseMessageListenerContainer.start();
}
But the container does not stay with the last defined queue:
|30-11-2016 16:32:19.996|INFO |es.omie.amqp.config.listener.XBIDConnectionListener|onCreate|XBIDConnectionListener.java|42|| ------ RESPONSE QUEUE -> OLD NAME: [amq.gen-uTp6TCP66x2AlXUmQzqz8g] NEW NAME: amq.gen-DOwEn8WKz_9ymCBQFiMDNg
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-uTp6TCP66x2AlXUmQzqz8g' in vhost 'sob', class-id=50, method-id=10)
I can't see the new queue name.
I can see in the logs the restart of the container before the connection is created:
Restarting Consumer: tags=[{amq.ctag-KYCURTkS4EevXHQtrpYV9Q=amq.gen-uTp6TCP66x2AlXUmQzqz8g}]
But not after the start method.