1
votes

I have a stream of incoming data that is sent to RabbitMQ as individual messages.

I want to send these to a service that requires a batch of messages. I need to send the request to the service when I either have a batch of 1000 messages or when 5 seconds have expired. Is this possible using SimpleMessageListenerContainer?

The SimpleMessageListenerContainer supports transactions, however this won't help with the 5 second timeout. I did look at the method doReceiveAndExecute(BlockingQueueConsumer consumer) and "receiveTimeout", but as this variable is inside the transaction loop I could end up waiting 5 seconds per message (1000*5 sec= 83 min).

I currently have a channel aware listener that batches the messages into a bulk processor that will manage my timeouts and queue length. The SimpleMessageListenerContainer is set to manual ack. However as the listener returns before the message has actually been sent to the service I occasionally have issues when I do come to ack the message as the channel has been closed.

I have thought about writing my own ListenerContainer that sends the whole BlockingQueueConsumer to the Listener. Is this the only solution or has anyone managed to do something similar already?

1

1 Answers

3
votes

You can use a ChannelAwareMessageListener, set acknowledgeMode=MANUAL; accumulate the deliveries in the listener; start a timer (scheduled task) to execute in +5 seconds and keep a reference to the channel. When a new delivery arrives, cancel the task, add the new delivery to the collection.

When 1000 deliveries arrive (or the scheduled task fires); invoke your service; then use channel.basicAck() (multiple) to ack the processed messages.

You'll have some race conditions to deal with but it should be pretty easy. Perhaps another queue of batches would be easiest with some other thread waiting for batches to arrive in that queue.

EDIT

As of 2.2, the SimpleMessageListenerContainer supports delivering batches of messages natively - see Batched Messages.

Starting with version 2.2, the SimpleMessageListeneContainer can be use to create batches on the consumer side (where the producer sent discrete messages).

Set the container property consumerBatchEnabled to enable this feature. deBatchingEnabled must also be true so that the container is responsible for processing batches of both types. Implement BatchMessageListener or ChannelAwareBatchMessageListener when consumerBatchEnabled is true. See @RabbitListener with Batching for information about using this feature with @RabbitListener.