0
votes

I have real time data which is handled by storm topology. Data can be in of four types let's say A, B, C, D. Each of this data is consumed by bolt in random order. What I need to do is to compare two tuples of same data type. Example, I want to compare type A tuple with next type A tuple or lets say compare current type A tuple with previously received type A tuple. Is there a way to do this in bolt? Or I have to save the previous result somewhere in database (let's say hbase or cache) and query it to compare with current tuple of particular type.

EDIT

Lets say stream of data of type A, B, C, D is coming from spout

B4 A4 C7 D2 A3 A2 B3 C6 D1 B2 C5 C4 B1 C3 C2 C1 A1-----> Spout --> BOLT

Now at bolt I want to compare A1 with A2, A2 with A3, A3 with A4. Similarly B1 with B2, B2 with B3 etc.

1
Storm has no ordering guarantees. So what so you mean by compare "type A tuple with next type A tuple"? If a bolt has multiple producer tasks (ie, the single producer bolt has a dop > 1), type A tuples can be delivered over each channel and there is no ordering. - Matthias J. Sax
I have updated the example. See edit - big
But this only works if both your spout and bolt have a parallelism of one! Is this really what you want? - Matthias J. Sax
Yes as of now I want it with parallelism of "one". I think it will be difficult to have this with parallelism of two or more. In that case I think I have to store it somewhere and query it. Am I right? - big
If you use a reliable spout and enable fault-tolerance, you can buffer the first tuple in an internal variable and "wait" for the second tuple to arrive. Than you can compare both, and drop the first tuple (and ack it -- do not ack it as long as you need it for comparison -- otherwise you break fault-tolerance). - Matthias J. Sax

1 Answers

1
votes

You could specify the type of your data when you emit your tuples in the spout. Then you can use field grouping, so every type A will go to the same thread. This way you could have a maximum of 4 different threads executing your bolt code. The order in each thread is guaranteed.

builder.setBolt(BOLT_NAME, new BoltClass(),4)
.fieldsGrouping(SPOUT_NAME,new Fields("type"));

Fields grouping definition in Storm documentation:

Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.

http://storm.apache.org/documentation/Concepts.html