I am using storm-kafka-1.1.1-plus and storm 1.1.1. And configured using BaseRichBolt, one KafkaSpout and two bolts bolt-A, Bolt-B the tuples are anchored in bolt-A once the bolt-B acknowledged it will be considered as a successfully processed tuple and it will be committed. But, the problem is for some reason some failed message got duplicated in KafkaSpout.
For Example
KafkaSpout emitted 1000 tuples while processing it for some reason nearly 20 tuples were got failed (at bolt-B ). those 20 tuples were replays continuously, at some point worker got killed and supervisor restarts the worker and again those 20 tuples were replays and this time it successfully processed but it processed multiple times( duplicated ).
But, I want those tuples must be processed only once (successfully). I have set the topology.enable.message.timeouts as false. And my another question is where does the Storm stores those failed Kafka offset details. I didn't find it on zookeeper it only has below detail.
{"topology":{"id":"test_Topology-12-1508938595","name":"test_Topology"},"offset":505,"partition":2,"broker":{"host":"127.0.0.1","port":9092},"topic":"test_topic_1"}
