5
votes

I am running a Storm Topology which is getting tweets from Kafka on AWS Ubuntu Server 14.04 LTS instances with 4 nodes - Nimbus, a Supervisor, a Kafka-Zookeeper node, a Zookeeper (for Storm cluster). My Storm UI is up and running and I am able to submit topologies. I have two brokers, but I'm only using the broker.id=0 one. I have tweets in it under a topic. My kafka server is running fine too.

I created the kafka-topic in this way:
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic twitter1

The thing I'm confused about is:

SpoutConfig kafkaConfig = new SpoutConfig(kafkaHosts, topicName+"-0", "/kafka", topicName+"-0");

I think my errors are sprouting from this point. Complete code is:

import org.apache.storm.tuple.Fields;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import java.util.Arrays;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.kafka.StringScheme;

public class TwitterTopology{
    public static void main(String[] args) {
        String topicName = "twitter1";
        String topologyName = args[0];
        String kafkaIp = "xxx.31.xxx.207"; //hiding the IPs here. This is the IP for my kafka-zk node. Is this ok?
        String nimbusHost = "xxx.31.xxx.70";
        String kafkaHost = kafkaIp + ":9092";
        BrokerHosts kafkaHosts = new ZkHosts(kafkaHost);
        SpoutConfig kafkaConfig = new SpoutConfig(kafkaHosts, topicName, "/kafka", topicName);
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("twitter-spout", kafkaSpout, 8);
        builder.setBolt("WordSplitterBolt", new JsonWordSplitterBolt(5)).shuffleGrouping("twitter-spout");
        builder.setBolt("IgnoreWordsBolt", new IgnoreWordsBolt()).shuffleGrouping("WordSplitterBolt");
        builder.setBolt("WordCounterBolt", new WordCounterBolt(5, 5 * 60, 50)).shuffleGrouping("IgnoreWordsBolt");
        Config config = new Config();
        config.setDebug(false);
        config.setMaxTaskParallelism(5);
        config.put(Config.NIMBUS_HOST, nimbusHost);
        config.put(Config.NIMBUS_THRIFT_PORT, 6627);
        config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
        config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(kafkaIp));
        try {
                config.setNumWorkers(20);
                config.setMaxSpoutPending(5000);
                StormSubmitter.submitTopology(topologyName, config, builder.createTopology()); 
            } catch (Exception e) {
                    throw new IllegalStateException("Couldn't initialize the topology", e);
            }
    }
}

I am getting this exception in the Storm UI:

Unable to get offset lags for kafka. Reason: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/topics/twitter1/partitions at org.apache.zookeeper.KeeperException.create(KeeperException.java:99) at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1590) at org.apache.curator.shaded.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:242) at org.apache.curator.shaded.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:231) at org.apache.curator.shaded.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64) at org.apache.curator.shaded.RetryLoop.callWithRetry(RetryLoop.java:100) at org.apache.curator.shaded.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:228) at org.apache.curator.shaded.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:219) at org.apache.curator.shaded.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:41) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getLeadersAndTopicPartitions(KafkaOffsetLagUtil.java:319) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:256) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)

The error Unable to get offset lags for kafka remains constant while the other part of the exception changes according to the zkroot path that I change (3rd argument in SpoutConfig). I don't know how exactly to fill these arguments up to have the Kafka pull in the tweets from my topic.

I used the tutorial present here to write the code for submitting the topology: http://stdatalabs.blogspot.ca/2016/10/real-time-stream-processing-using.html I have made numerous changes for the maven dependencies. My pom.xml has all the dependencies for storm-core, kafka, etc. with the latest versions available in the maven repo.

1

1 Answers

0
votes

The zkHosts() should contain the zookeeper's config instead of kafka. If your zookeeper and kafka are on the same server.

Try giving the correct port for zookeeper(2181)

Refer https://storm.apache.org/releases/1.2.3/storm-kafka.html