24
votes

Just want to make sure I got how Ack-ing works in Storm. I have 1 spout and 2 bolts chained together. Spout emits tuple to Bolt1 which in turn will emit a tuple to Bolt 2. I want Bolt 2 to ack the initial tuple sent from Spout and I'm not sure how.

In order to guarantee fault tolerance (ie: tuples are resent) I want to ack in bolt 2 the tuple emitted by Spout just in case it fails somewhere in the process so it can be resent.

Consider this example:

Spout:

 _collector.emit(new Values(queue.dequeue())

Bolt1:

def execute(tuple: Tuple) {
 _collector.emit(tuple, new Values("stuff"))
}

At this point tuple is the tuple sent by the spout. I can ack it here w no probs. Now add another bolt which listens in on tuples emitted by Bolt1.

Bolt2:

def execute(tuple2: Tuple) {
 _collector.emit(tuple2, new Values("foo"))
}

At this point the tuple in tuple2 is the tuple sent from Bolt1 (the one that has string "stuff" in it).
So if I send an ack in Bolt2 this will ack the tuple from Bolt1 not the one sent from Spout. Correct?

How can I ack the tuple that was sent from the spout? Should I piggy back the initial spout on all the other spouts so I can retrieve it in the last Bolt and ack it?

I read Nathan's tutorials and I got the impression that I could ack the tuple received in Bolt1 (from Spout) right there after emitting tuple2. This would link the newly emitted tuple2 to the original tuple sent by Spout so when Bolt2 acks tuple 2 it actually acks the original tuple from the Spout. Is this true?

Let me know if I'm missing something in my explanation.

3

3 Answers

28
votes

For those interested, I've found a solution by asking on the storm group. What I need is in Spout to emit tuples the following way (with a unique ID):

Spout:

 //ties in tuple to this UID
 _collector.emit(new Values(queue.dequeue(), *uniqueID*) 

Then Bolt1 will ack the tuple only after it emits it to Bolt2

Bolt1:

 //emit first then ack
 _collector.emit(tuple, new Values("stuff")) //**anchoring** - read below to see what it means
 _collector.ack(tuple) 

At this point tuple from Spout has been acked in Bolt1, but at the same time the newly emitted tuple "stuff" to Bolt2 is "anchored" to the tuple from Spout. What this means is that it still needs to be acked later on otherwise on timeout it will be resent by spout.

Bolt2:

 _collector.ack(tuple) 

Bolt2 needs to ack the tuple received from Bolt1 which will send in the last ack that Spout was waiting for. If at this point Bolt2 emits tuple, then there must be a Bolt3 which will get it and ack it. If the tuple is not acked at the last point, Spout will time it out and resend it.

Each time anchoring is done on an emit statement from bolt to bolt, a new node in a "tree" structure is built... well more like a list in my case since I never send the same tuple to 2 or more tuples, I have a 1 to 1 relationship.

All nodes in the tree need to be acked, and only then the tuple is marked as fully arrived. If the tuple is not acked and it is sent with a UID and anchored later on then it will be kept in memory forever (until acked).

Hope this helps.

0
votes

You can read about this in the official documentation.

If you want to track the execution of your touples throughout all bolts you can use BaseBasicBolt as parent class where this behaviour is already defined.

In any other use case (i.e. you want the touple to be ack'ed prior to execution in the last bolt) you should manually define the links between your tuples (called anchoring). Refer to the documentation.

0
votes

You need to anchor tuple . Take a look at Guaranteeing-message-processing And especially you need this:

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));