1
votes

I am using kafka 0.10.1.1 and storm 1.0.2. In the storm documentation for kafka integration , i can see that offsets are still maintained using zookeeper as we are initializing kafka spout using zookeeper servers. How can i bootstrap the spout using kafka servers .Is there any example for this . Example from storm docs

    BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

This option using zookeeper is working fine and is consuming the messages . but i was not able to see the consumer group or storm nodes as consumers in kafkamanager ui .

Alternate approach tried is this .

KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig();

KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig);

 private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() {

        Map<String, Object> props = new HashMap<>();
        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, bootstrapServers);
        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID);
        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");

        String[] topics = new String[1];
        topics[0] = topicName;

        KafkaSpoutStreams kafkaSpoutStreams =
                new KafkaSpoutStreamsNamedTopics.Builder(new Fields("message"), topics).build();

        KafkaSpoutTuplesBuilder<String, String> tuplesBuilder =
                new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new TuplesBuilder(topicName)).build();

        KafkaSpoutConfig<String, String> spoutConf =
                new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, tuplesBuilder).build();

        return spoutConf;
    }

But this solution is showing CommitFailedException after reading few messages from kafka.

2

2 Answers

1
votes

Storm-kafka writes consumer information in a different location and different format in zookeeper with common kafka client. So you can't see it in kafkamanager ui.

You can find some other monitor tools, like https://github.com/keenlabs/capillary.

1
votes

On your alternate approach, you're likely getting CommitFailedException due to:

props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");

Up to Storm 2.0.0-SNAPSHOT (and since 1.0.6) - KafkaConsumer autocommit is unsupported

From the docs:

Note that KafkaConsumer autocommit is unsupported. The KafkaSpoutConfig constructor will throw an exception if the "enable.auto.commit" property is set, and the consumer used by the spout will always have that property set to false. You can configure similar behavior to autocommit through the setProcessingGuarantee method on the KafkaSpoutConfig builder.

References: