I'm trying to implement a RabbitMQ configuration that will allow me to use a fixed reply queue instead of having hundreds of temp queues appear.
My first message that gets published, gets an immediate response via the reply queue, the second, third and sometimes even fifth message, just gives me a stacktrace saying Reply received after timeout
. If I wait a bit, and send another message, I get a response again with any immediate consecutive messages failing again with the same error.
On the publisher side, I have the following configuration:
<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
<property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
<property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
id="connectionFactory"
port="${rabbit.port}"
virtual-host="${rabbit.virtual}"
host="${rabbit.host}"
username="${rabbit.username}"
password="${rabbit.password}"
connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
id="amqpTemplate"
connection-factory="connectionFactory"
reply-timeout="${rabbit.rpc.timeout}"
reply-queue="reply">
<rabbit:reply-listener />
</rabbit:template>
<rabbit:queue id="reply" name="reply" />
On the consumer side I have the following configuration:
<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
<property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
<property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
id="connectionFactory"
port="${rabbit.port}"
virtual-host="${rabbit.virtual}"
host="${rabbit.host}"
username="${rabbit.username}"
password="${rabbit.password}"
connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
id="amqpTemplate"
connection-factory="connectionFactory"
reply-timeout="${rabbit.rpc.timeout}"
reply-queue="reply">
<rabbit:reply-listener concurrency="${rabbit.consumers}" />
</rabbit:template>
<!-- Register Queue Listener Beans -->
<rabbit:listener-container
connection-factory="connectionFactory"
channel-transacted="true"
requeue-rejected="true"
concurrency="${rabbit.consumers}">
<rabbit:listener queues="test" ref="TestProcessor" method="onMessage" />
</rabbit:listener-container>
<rabbit:queue id="test" name="test" />
<rabbit:queue id="reply" name="reply" />
I'm using spring-amqp 1.4.4 in case that's of any use:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.4.RELEASE</version>
</dependency>
This is how I build up my message and publish it:
MessageProperties properties = new MessageProperties();
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(toJson(request).getBytes(), properties);
Message res = getTemplate().sendAndReceive(exchange, queue, message);
Template is simply an autowired instance of AmqpTemplate:
@Autowired
AmqpTemplate template;
First message gets an immediate response, the second message (and third and so forth) gets the following stacktrace on the consumer side:
2015-04-22 07:53:03,329 [SimpleAsyncTaskExecutor-1] WARN org.springframework.amqp.rabbit.core.RabbitTemplate - Reply received after timeout for 4bfb2f6f-2e31-414c-9ec3-a4672e4c7e34
2015-04-22 07:53:03,330 [SimpleAsyncTaskExecutor-1] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
... 10 more
... while the publisher just times out after not getting any response on the reply queue.
This is how I respond to a message on the consumer side:
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
...
System.out.println(message);
// handle reply-to
if (message.getMessageProperties() != null && message.getMessageProperties().getReplyTo() != null) {
Message res = new Message(toJson(response).getBytes(), message.getMessageProperties());
getTemplate().send("", message.getMessageProperties().getReplyTo(), res);
}
} catch (Exception e) {
e.printStackTrace();
// TODO: forward to exception queue here
}
}
That System.out.println(message);
prints the following:
(Body:'{"message":"Sent 'Test Text' on Wed Apr 22 08:17:13 SAST 2015"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=[56, 50, 98, 100, 100, 56, 53, 54, 45, 57, 101, 100, 102, 45, 52, 99, 54, 97, 45, 97, 55, 51, 101, 45, 102, 54, 48, 101, 50, 49, 48, 53, 55, 101, 97, 48], replyTo=reply, contentType=application/json, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=test, deliveryTag=1, messageCount=0])
Any ideas?