I have a stream of incoming data that is sent to RabbitMQ as individual messages.
I want to send these to a service that requires a batch of messages. I need to send the request to the service when I either have a batch of 1000 messages or when 5 seconds have expired. Is this possible using SimpleMessageListenerContainer?
The SimpleMessageListenerContainer supports transactions, however this won't help with the 5 second timeout. I did look at the method doReceiveAndExecute(BlockingQueueConsumer consumer) and "receiveTimeout", but as this variable is inside the transaction loop I could end up waiting 5 seconds per message (1000*5 sec= 83 min).
I currently have a channel aware listener that batches the messages into a bulk processor that will manage my timeouts and queue length. The SimpleMessageListenerContainer is set to manual ack. However as the listener returns before the message has actually been sent to the service I occasionally have issues when I do come to ack the message as the channel has been closed.
I have thought about writing my own ListenerContainer that sends the whole BlockingQueueConsumer to the Listener. Is this the only solution or has anyone managed to do something similar already?