I'm implementing a bolt in Storm that receives messages from a RabbitMQ spout (https://github.com/ppat/storm-rabbitmq).
Each event I have to process in Storm arrives as two messages from Rabbit so I have a fieldsGrouping on the bolt so that the two messages arrive in the same bolt.
My first approach I would:
- Receive the first Tuple and save the message in Memory
- Ack the first tuple
- When second Tuple arrived fetch the first from Memory and emit a new tuple anchored to the second from the spout.
This worked but I could loose messages if a worker died because I would ack the first tuple before getting the second and processing.
I changed this to:
- Receive the first Tuple and save it in Memory
- When second Tuple arrived fetch the first from Memory, emit a new tuple anchored to both input tuples and ack both input tuples.
The in-memory cache is a Guava cache with time expiration and when a Tuple is evicted due to timeout I will fail() it in the topology so that it gets reprocessed latter.
This seemed to work but when I did some tests I got to a situation where the system would stop fetching messages from the Rabbit Queue.
The prefetch on the queue is set to 5, and spout with setMaxSpoutPending at 7. In the Rabbit interface I see 5 Unacked messages.
In the storm logs I see the same Tuples being evicted from the cache over and over again.
I understand that the problem is that spout will only fetch 5 messages that are all the first part of a pair. I can increase the prefetch but that is no warranty that this will not happen in production.
So my question is: How to implement a join while handling these problems in Storm?