1
votes

I'm trying to simulate stream traffic using Kafka to Storm. I used KafkaSpout to read a message from one topic sent by a Producer that read these Tweets and send them to a topic. My problem is that after topology consumes all tweet send in this topic it continues to read the message in the topic twice. How can I stop KafkaSpout from reading twice?(replication factor is set to 1)

1
Before anything else, please make sure you're using the latest Storm version. If it still doesn't work, please post your topology configuration (topology wiring, maybe also pom.xml). - Stig Rohde Døssing
Thanks for your reply. I edit my post with these information. - Marco Domenicano
Could you post your spout configuration as well? Noticed a few other things you might want to change: The scope of storm-core should be "provided", not "compile". In Classifier.execute you can end up acking the tuple twice if an exception happens. You need to make sure only to ack the tuple exactly once, or Storm will consider it failed and replay it. Finally consider upgrading to storm-kafka-client as well as a new Kafka version. 0.8.2.2 is very old, and storm-kafka is deprecated for removal. - Stig Rohde Døssing
Ok thank you i think one of error is twice acking! I tried to migrate to storm-kafka-client but seems that it can't read data from topic. I update my first post with kafkaSpoutCreator and it's configuration. Really thank u for these help i'm actually a newbie of these framework. - Marco Domenicano
Yes, it can't read the data because Kafka is too old. storm-kafka-client requires Kafka 0.10.1.0 (as far as I recall). But you can keep using storm-kafka if you want, just wanted to make sure you knew it will be deleted as of Storm 2.0.0. storm-kafka also isn't compatible with Kafka after version 2.0.0. - Stig Rohde Døssing

1 Answers

0
votes

The configuration looks fine to me.

Maybe the issue is double acking. Make sure you're only acking each tuple once in execute.

As mentioned in a comment, please consider upgrading to a newer Kafka version, as well as switching to storm-kafka-client.

Also something that may make your life a little easier: Consider extending BaseBasicBolt instead of BaseRichBolt. BaseBasicBolt automatically acks the tuple for you if running execute doesn't throw an error. If you want to fail a tuple you can throw FailedException. BaseRichBolt should only be used if you want to do more complicated acking, e.g. aggregating tuples from many execute invocations in-memory before acking.