0
votes

I am new to kafka and storm. I was trying to implement a java example which integrates Kafka and storm. I found an example online. I am trying to run the java program in eclipse IDE. I am not using maven.

I have storm-kafka-0.10.0.jar, kafka-0.6.jar, scala-library-2.10.3.jar and storm-core-0.10.0.jar as external jars.

Here is my java code.

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.ZkHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);

      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
    //kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
    //builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGrouping("word-spitter");

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.createTopology());

      Thread.sleep(10000);

      cluster.shutdown();
   }
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;

   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);

      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }

      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {

   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

When I try to run the kafkaStormSample.java I keep getting the below error.

   Exception in thread "main" java.lang.NoSuchMethodError: kafka.api.OffsetRequest.DefaultClientId()Ljava/lang/String;
    at storm.kafka.KafkaConfig.<init>(KafkaConfig.java:43)
    at storm.kafka.SpoutConfig.<init>(SpoutConfig.java:40)
    at KafkaStormSample.main(KafkaStormSample.java:23)

I made sure I have all the required jars. But still I think I am missing jar.

Any help would be appreciated.

Thanks !

1

1 Answers

0
votes

I don't know much about those systems, but it looks to me like a library version mismatch.

One of the libraries (Storm in thids case) was compiled against a different version of kafka where that method is defined. Check your dependencies.

Is one of the reasons dependency management systems are helpful.

Update: From their documentation they provide this set up on Maven:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.1.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

Seems your kafka version is too old.