0
votes

I am designing an apache storm topology (using streamparse) built with one spout (apache kafka spout) and 1 bolt with a parallelism > 1 that read messages in batch from kafka spout and persist messages in mysql table

The bolt read messages in batches. If the batch complete successfully i manually commit apache kafka offset.

When the bolt insert on mysql fail i don't commit the offset in kafka, but some messages are already in the queue of the messages that the spout has sent to the bolt.

The messages that are already on the queue should be removed because i cannot advance the kafka offset without loosing the previous failed messages.

is there a way in streamparse to clean or fail all the messages that are already in the queue at bolt startup?

1

1 Answers

0
votes

I don't know about streamparse, but the impression I get is that you want to bundle up tuples up and write them as a batch. Let's say you've written up to offset 10. Now your bolt receives offset 11-15, and the batch fails to write. Offset 15-20 is queued, and you want to not process them right now, because that would process the batches out of order.

Is this understanding right?

First, I would drop manually committing offsets. You should let the spout handle that. Assuming you are using storm-kafka-client, you can configure it to only commit offsets once the corresponding tuple and all preceding tuples have been acked.

What you should probably do is keep track in the bolt (or even better, in your database) what the highest offset was in the failed batch. Then when your bolt fails to write offset 11-15, you can make the bolt fail every tuple with offset > 15. At some point, you will receive offset 11-15 again, and can retry writing the batch. Since you failed all messages with offset > 15, they will also be retried, and will arrive after the messages in the failed batch.

This solution assumes that you don't do reordering of the message stream between the spout and your writer bolt, so the messages arrive at the bolt in the order they are emitted.