0
votes

I have the use case that I need to wait 2 hours before consuming messages from an AMQP (we use Rabbit) queue.

EDIT: To clarify my use case... I need each individual message to wait 2 hours before being read. E.g. Message 1 arrives at 10am and Message 2 arrives at 10:15. I need Message 1 to be read at 12p and Message 2 to be read at 12:15p.

We are using Spring Integration 3.x.

The int-amqp:inbound-channel-adapter is message driven and doesn't have a polling option from what I can find.

A couple things I've thought of:

Any suggestions?

3

3 Answers

1
votes

We don't currently have a polling inbound adapter. #1 is easy. For #2, the simplest would be to use a RabbitTemplate and invoke receive() from an inbound-channel-adapter in a POJO.

I would go with #1; you don't need quartz, you can use a simple Spring scheduled task and a control bus to start the adapter.

1
votes

Another trick is about to use PollableAmqpChannel:

<int-amqp:channel id="myQueueName" message-driven="false"/>

and provide the <poller> for the subscriber to that channel.

There is no reason to send messages to that channel (because you will poll messages from Rabbit Queue) and, right, it looks like anti-pattern, but it is a hook how to avoid any workarounds with direct RabbitTemplate usage via SpEL.

UPDATE

<delayer> can help you, but it depends of your requirements. If you don't want to poll messages from RabbitMQ, you should use the workaround above. But if you just don't want to process message until some time is elapsed, you can just 'delay' it for that time.

Don't forget to add persistent message-store to avoid losing messages during that period and unexpected application failure.

0
votes

FYI, how I solved the issue. (Used solution #3).

<rabbit:queue name="delayQueue" durable="true">
  <rabbit:queue-arguments>
    <entry key="x-message-ttl">
       <value type="java.lang.Long">7200000</
     </entry>
     <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>
     <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/>
  </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange name="finalDestinationTopic">
  <rabbit:bindings>
    <rabbit:binding queue="finalDestinationQueue" pattern="finalDestinationQueue"/>
  </rabbit:bindings>
</rabbit:topic-exchange>