I'm trying to figure out why all my Kafka messages are getting replayed every time I restart my Storm topology.
My understanding how how it should work were that once the last Bolt have ack'ed the tuple the spout should commit the message on Kafka, and hence I should not see it replay after a restart.
My code is a simple Kafka-spout and a Bolt which just print every message and then ack'ing them.
private static KafkaSpout buildKafkaSpout(String topicName) {
ZkHosts zkHosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(zkHosts,
topicName,
"/" + topicName,
"mykafkaspout"); /*was:UUID.randomUUID().toString()*/
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
return new KafkaSpout(spoutConfig);
}
public static class PrintBolt extends BaseRichBolt {
OutputCollector _collector;
public static Logger LOG = LoggerFactory.getLogger(PrintBolt.class);
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
LOG.error("PrintBolt.0: {}",tuple.getString(0));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("nothing"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka", buildKafkaSpout("mytopic"), 1);
builder.setBolt("print1", new PrintBolt(),1).shuffleGrouping("kafka");
}
I have not provided any config settings than those in the code.
Am I missing a config-setting or what am I doing wrong?
UPDATE:
To clarify, everything works fine until I restart the pipeline. The below behavior is what I can get in other (non-storm) consumers, and what I expected from the KafkaSpout
However the actual behavior Im getting using the default setting is the following. The messages are processed fine up to I stop the pipeline, and then when I restart I get a replay of all the messages, including those (A and B) which I believed I had ack'ed already
As per the configuration options mentioned by Matthias, I can change the startOffsetTime to Latest, however that is literally the latest where the pipeline is dropping the messages (Message "C") that were produced while the pipeline were restarting.
I have a consume written in NodeJS (using npm kafka-node) which is able to ack messages to Kafka and when I restart the NodeJs consumer it does exactly what I expected (catchup on message "C" which were produced when the consumer were down and continue from there) -- so how do I get the same behavior with the KafkaSpout?



declareOutputFieldsempty.) - Matthias J. SaxLatestTimethe spouit now drops all messages which were sent while the spout was not running -- which is no good either -- so not a solution. - Soren