4
votes

I'm trying to figure out why all my Kafka messages are getting replayed every time I restart my Storm topology.

My understanding how how it should work were that once the last Bolt have ack'ed the tuple the spout should commit the message on Kafka, and hence I should not see it replay after a restart.

My code is a simple Kafka-spout and a Bolt which just print every message and then ack'ing them.

private static KafkaSpout buildKafkaSpout(String topicName) {
    ZkHosts zkHosts = new ZkHosts("localhost:2181");
    SpoutConfig spoutConfig = new SpoutConfig(zkHosts, 
            topicName, 
            "/" + topicName, 
            "mykafkaspout");      /*was:UUID.randomUUID().toString()*/
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    return new KafkaSpout(spoutConfig);
}

public static class PrintBolt extends BaseRichBolt {
    OutputCollector _collector;
    public static Logger LOG = LoggerFactory.getLogger(PrintBolt.class);

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        LOG.error("PrintBolt.0: {}",tuple.getString(0));
        _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("nothing"));
    }
}

public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("kafka", buildKafkaSpout("mytopic"), 1);
    builder.setBolt("print1", new PrintBolt(),1).shuffleGrouping("kafka");
}

I have not provided any config settings than those in the code.

Am I missing a config-setting or what am I doing wrong?

UPDATE:

To clarify, everything works fine until I restart the pipeline. The below behavior is what I can get in other (non-storm) consumers, and what I expected from the KafkaSpout

My expectations: Expected behavior

However the actual behavior Im getting using the default setting is the following. The messages are processed fine up to I stop the pipeline, and then when I restart I get a replay of all the messages, including those (A and B) which I believed I had ack'ed already

What actually happens: EarliestTime, default behavior

As per the configuration options mentioned by Matthias, I can change the startOffsetTime to Latest, however that is literally the latest where the pipeline is dropping the messages (Message "C") that were produced while the pipeline were restarting.

LatestTime, messages dropped

I have a consume written in NodeJS (using npm kafka-node) which is able to ack messages to Kafka and when I restart the NodeJs consumer it does exactly what I expected (catchup on message "C" which were produced when the consumer were down and continue from there) -- so how do I get the same behavior with the KafkaSpout?

1
You mean, as long as the topology is running everything is fine? There are no failed tuples? However, when you kill topology and re-submit it, it processes old tuples again instead of resuming from the last Kafka offset that got processed and acked? (btw: if bolt is a sink as in your case, you don't need to declare an output stream. You can leave declareOutputFields empty.) - Matthias J. Sax
Yes that is exactly what I mean -- it works fine with no failed tuples, but when restarting I see the replay of the already processes tuples. Playing around with the startOffsetTime as per your link changes the behavior, and setting it to LatestTime the spouit now drops all messages which were sent while the spout was not running -- which is no good either -- so not a solution. - Soren
I see. However, the behavior itself is correct. For your use case, you need to make sure, that the offset is stored in Zookeeper such that it can be picked up at redeploy... From the link: "Important: When re-deploying a topology make sure that the settings for SpoutConfig.zkRoot and SpoutConfig.id were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case." -- It seems something is wrong with Zookeeper in your case. - Matthias J. Sax
ZkHosts zkHosts = new ZkHosts("localhost:2181"); You are not using a production setup, which persists the consumer state... You need to configure kafka - SQL.injection

1 Answers

4
votes

The problem were in the submit code -- the template code for submitting the topology will create a instance of LocalCluster if the storm jar is run without a topology name, and the local cluster does not capture the state and hence the replay.

So

$ storm jar myjar.jar storm.myorg.MyTopology topologyname

will launch it on my single node development cluster, where

$ storm jar myjar.jar storm.myorg.MyTopology

will launch it on an instance of LocalCluster