2
votes

I am usign storm 0.10 and kafka 0.9.0.0 with storm-kafka. Whenever I am running my topology on cluster it starts reading from beginning although I am giving zkRoot and consumer groupId from properties file as -

kafka.zkHosts=myserver.myhost.com:2181
kafka.topic=onboarding-mail-topic
kafka.zkRoot=/kafka-storm
kafka.group.id=onboarding

Spout:

BrokerHosts zkHosts = new ZkHosts(prop.getProperty("kafka.zkHosts"));
                    String topicName = prop.getProperty("kafka.topic");
                    String zkRoot = prop.getProperty("kafka.zkRoot");
                    String groupId = prop.getProperty("kafka.group.id");

                    //kafka spout conf
                    SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, groupId);

                    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

                    KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);

When I check zookeeper ls / it doesn't show me kafka-storm

[controller_epoch, controller, brokers, storm, zookeeper, kafka-manager, admin, isr_change_notification, consumers, config]
1

1 Answers

0
votes

Finally, I figured it out. Since reading from kafka and writing offset back to kafka are controlled in a different way.

If you are running your topology on a storm cluster irrespective of single or multi node make sure you have set following in your storm.yaml file

storm.zookeeper.servers

and

storm.zookeeper.port

properties apart from zkHosts and zkRoot and consumer group id.

Or best practice would be to override these properties in your topology by setting correct values while creating KafkaSpout like -

        BrokerHosts zkHosts = new ZkHosts(prop.getProperty("kafka.zkHosts"));
        String topicName = prop.getProperty("kafka.topic");
        String zkRoot = prop.getProperty("kafka.zkRoot");
        String groupId = prop.getProperty("kafka.group.id");
        String kafkaServers = prop.getProperty("kafka.zkServers");
        String zkPort = prop.getProperty("kafka.zkPort");
        //kafka spout conf
        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, groupId);

        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        kafkaConfig.zkServers = Arrays.asList(kafkaServers);
        kafkaConfig.zkPort = Integer.valueOf(zkPort);

        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);

Or even you can put these value in Config object. This is better since you might want to store offset info to some other zookeeper cluster vs your topology reads message from a completely different broker

KafkaSpout code snippet for understanding-

 @Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
    _collector = collector;

    Map stateConf = new HashMap(conf);
    List<String> zkServers = _spoutConfig.zkServers;
    if (zkServers == null) {
        zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    }
    Integer zkPort = _spoutConfig.zkPort;
    if (zkPort == null) {
        zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
    }
    stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
    stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
    stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
    _state = new ZkState(stateConf);

    _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));

    // using TransactionalState like this is a hack
    int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
    if (_spoutConfig.hosts instanceof StaticHosts) {
        _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
    } else {
        _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
    }