I am designing an apache storm topology (using streamparse) built with one spout (apache kafka spout) and 1 bolt with a parallelism > 1 that read messages in batch from kafka spout and persist messages in mysql table
The bolt read messages in batches. If the batch complete successfully i manually commit apache kafka offset.
When the bolt insert on mysql fail i don't commit the offset in kafka, but some messages are already in the queue of the messages that the spout has sent to the bolt.
The messages that are already on the queue should be removed because i cannot advance the kafka offset without loosing the previous failed messages.
is there a way in streamparse to clean or fail all the messages that are already in the queue at bolt startup?