2
votes

I'm implementing a bolt in Storm that receives messages from a RabbitMQ spout (https://github.com/ppat/storm-rabbitmq).

Each event I have to process in Storm arrives as two messages from Rabbit so I have a fieldsGrouping on the bolt so that the two messages arrive in the same bolt.

My first approach I would:

  1. Receive the first Tuple and save the message in Memory
  2. Ack the first tuple
  3. When second Tuple arrived fetch the first from Memory and emit a new tuple anchored to the second from the spout.

This worked but I could loose messages if a worker died because I would ack the first tuple before getting the second and processing.

I changed this to:

  1. Receive the first Tuple and save it in Memory
  2. When second Tuple arrived fetch the first from Memory, emit a new tuple anchored to both input tuples and ack both input tuples.

The in-memory cache is a Guava cache with time expiration and when a Tuple is evicted due to timeout I will fail() it in the topology so that it gets reprocessed latter.

This seemed to work but when I did some tests I got to a situation where the system would stop fetching messages from the Rabbit Queue.

The prefetch on the queue is set to 5, and spout with setMaxSpoutPending at 7. In the Rabbit interface I see 5 Unacked messages.

In the storm logs I see the same Tuples being evicted from the cache over and over again.

I understand that the problem is that spout will only fetch 5 messages that are all the first part of a pair. I can increase the prefetch but that is no warranty that this will not happen in production.

So my question is: How to implement a join while handling these problems in Storm?

1

1 Answers

1
votes

Storm does not provide a good solution for this... What you would need is a reliable storage that buffers the first tuple (ie, a stateful operator). Thus, you could ack the first tuple immediately and recover the state after a failure.

  1. As far as I know, Trident supports some state handling. But I never used it.
  2. As a second alternative, you could use a distributed key-value store (like Casandra) as buffer. Of course, this would be a hand-written solution, ie, you need to code all Casandra interactions by yourself.
  3. Last but not least, you could switch to a stream processing system that does support stateful operators like Apache Flink. (disclaimer: I am a committer at Flink)