2
votes

I am using storm-kafka-0.9.3 to read data from the Kafka and process those data in Storm. Below is the Kafka Spout I am using.But the problem is when I kill the Storm cluster, it does not read old data which was sent during the time it was dead, it start reading from the latest offset.

BrokerHosts hosts = new ZkHosts(Constants.ZOOKEEPER_HOST);

SpoutConfig spoutConfig = new SpoutConfig(hosts, CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME
        , "/" + CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME,UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Never should make this true
spoutConfig.forceFromStart=false;
spoutConfig.startOffsetTime =-2;

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
return kafkaSpout;
3
can you please try comment out spoutConfig.forceFromStart=false; line or by set spoutConfig.forceFromStart=trueuser2720864
Tried that but same issue, see actually Assume I have 100 messages in kafka, Storm processing that, Now assume after 100th message Storm went down and my http end point pushed 300 more messages in Kafka, Since Storm processed only 100 message I expect when Storm wakes up it should start processing from 101 message where it left.user1249655
so what exactly happening ? in your post you've mentioned it start reading from the latest offset .. isn't that what you are looking for?user2720864
Basically when strom comes back it starts reading from 401 instead of 101.user1249655

3 Answers

2
votes

Thanks All, Since I was running the Topology in Local mode,Storm did not store Offset in ZK, when I ran the topology in Prod mode It got resolved.

Sougata

1
votes

I believe this might happen because while the topology is running it used to keep all the state information to zookeeper using the following path SpoutConfig.zkRoot+ "/" + SpoutConfig.id so that in case of failure it can resume from the last written offset in zookeeper.

Got this from the doc

Important:When re-deploying a topology make sure that the settings for SpoutConfig.zkRoot and SpoutConfig.id were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.

In your case as the SpoutConfig.id is a random value UUID.randomUUID().toString() Its not able to retrieve the last committed offset.

Also read from the same page

when a topology has run once the setting KafkaConfig.startOffsetTime will not have an effect for subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in ZooKeeper to determine from where it should begin (more precisely: resume) reading. If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime as described above

You could possibly use a static id to see if it is able to retrieve.

0
votes

You need to set the spoutConfig.zkServers and spoutConfig.zkPort :

BrokerHosts hosts = new ZkHosts(Constants.ZOOKEEPER_HOST);
SpoutConfig spoutConfig = new SpoutConfig(hosts,  CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME
    , "/" + CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME,"test");

spoutConfig.zkPort=Constants.ZOOKEEPER_PORT;  
spoutConfig.zkServers=Constants.ZOOKEEPER_SERVERS;

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
return kafkaSpout;