1
votes

I'm trying to implement a RabbitMQ configuration that will allow me to use a fixed reply queue instead of having hundreds of temp queues appear. My first message that gets published, gets an immediate response via the reply queue, the second, third and sometimes even fifth message, just gives me a stacktrace saying Reply received after timeout. If I wait a bit, and send another message, I get a response again with any immediate consecutive messages failing again with the same error.

On the publisher side, I have the following configuration:

<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
    <property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
    <property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
        id="connectionFactory"
        port="${rabbit.port}"
        virtual-host="${rabbit.virtual}"
        host="${rabbit.host}"
        username="${rabbit.username}"
        password="${rabbit.password}"
        connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
        id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-timeout="${rabbit.rpc.timeout}"
        reply-queue="reply">
    <rabbit:reply-listener />
</rabbit:template>

<rabbit:queue id="reply" name="reply" />

On the consumer side I have the following configuration:

<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
    <property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
    <property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
        id="connectionFactory"
        port="${rabbit.port}"
        virtual-host="${rabbit.virtual}"
        host="${rabbit.host}"
        username="${rabbit.username}"
        password="${rabbit.password}"
        connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
        id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-timeout="${rabbit.rpc.timeout}"
        reply-queue="reply">
    <rabbit:reply-listener concurrency="${rabbit.consumers}" />
</rabbit:template>

<!-- Register Queue Listener Beans -->
<rabbit:listener-container
        connection-factory="connectionFactory"
        channel-transacted="true"
        requeue-rejected="true"
        concurrency="${rabbit.consumers}">
    <rabbit:listener queues="test" ref="TestProcessor" method="onMessage" />
</rabbit:listener-container>

<rabbit:queue id="test" name="test" />
<rabbit:queue id="reply" name="reply" />

I'm using spring-amqp 1.4.4 in case that's of any use:

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.4.4.RELEASE</version>
    </dependency>

This is how I build up my message and publish it:

MessageProperties properties = new MessageProperties();          
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(toJson(request).getBytes(), properties);
Message res = getTemplate().sendAndReceive(exchange, queue, message);

Template is simply an autowired instance of AmqpTemplate:

@Autowired
AmqpTemplate template;

First message gets an immediate response, the second message (and third and so forth) gets the following stacktrace on the consumer side:

2015-04-22 07:53:03,329 [SimpleAsyncTaskExecutor-1] WARN  org.springframework.amqp.rabbit.core.RabbitTemplate - Reply received after timeout for 4bfb2f6f-2e31-414c-9ec3-a4672e4c7e34
2015-04-22 07:53:03,330 [SimpleAsyncTaskExecutor-1] WARN  org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
    at java.lang.Thread.run(Thread.java:744)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
    at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:1276)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
    ... 10 more

... while the publisher just times out after not getting any response on the reply queue.

This is how I respond to a message on the consumer side:

   @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            ...
            System.out.println(message);
            // handle reply-to
            if (message.getMessageProperties() != null && message.getMessageProperties().getReplyTo() != null) {

                Message res = new Message(toJson(response).getBytes(), message.getMessageProperties());
                getTemplate().send("", message.getMessageProperties().getReplyTo(), res);

            }
        } catch (Exception e) {
            e.printStackTrace();
            // TODO: forward to exception queue here
        }    
    }

That System.out.println(message); prints the following:

(Body:'{"message":"Sent 'Test Text' on Wed Apr 22 08:17:13 SAST 2015"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=[56, 50, 98, 100, 100, 56, 53, 54, 45, 57, 101, 100, 102, 45, 52, 99, 54, 97, 45, 97, 55, 51, 101, 45, 102, 54, 48, 101, 50, 49, 48, 53, 55, 101, 97, 48], replyTo=reply, contentType=application/json, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=test, deliveryTag=1, messageCount=0])

Any ideas?

2

2 Answers

2
votes

You have 2 rabbit templates, each using the same reply queue - so the "second" reply is being "received" by the consumer-side template (hence the log message because it received a "reply" when there is no outstanding request waiting for a reply - that's over on the producer side).

Note that, since rabbitmq 3.4, it's generally better to use the new rabbit built-in direct reply-to feature; it generally solves all the reasons for which we had to implement the fixed reply-to queue mechanism. Support for direct reply-to was adding in Spring AMQP 1.4.1.RELEASE.

0
votes

After days of meddling around the only thing I understood with sendAndReceive() of rabit template is to never to meddle with Binding keys and let the framework set it to the queue name. It works perfectly in that sense but if I use my brains over setting it, lot of things can go wrong.

Right now I am stuck with getting the correlation-id and the one i receive back is not the same as to what i sent. How is this possible ?