I am using KafkaSpout. Please find the test program below.
I am using Storm 0.8.1. Multischeme class is there in Storm 0.8.2. I will be using that. I just want to know how were the earlier versions working just by instantiating the StringScheme() class? Where can I download earlier versions of Kafka Spout? But I doubt that would be a correct alternative than to work on Storm 0.8.2. ??? (Confused)
When I run the code (given below) on storm cluster (i.e. when I push my topology) I get the following error (This happens when the Scheme part is commented else of course I will get compiler error as the class is not there in 0.8.1):
java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme
In the code given below you may find the spoutConfig.scheme=new StringScheme(); part commented. I was getting compiler error if I don't comment that line which is but natural as there are no constructors in there. Also when I instantiate MultiScheme I get error as I dont have that class in 0.8.1.
public class TestTopology {
public static class PrinterBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.toString());
}
}
public static void main(String [] args) throws Exception {
List<HostPort> hosts = new ArrayList<HostPort>();
hosts.add(new HostPort("127.0.0.1",9092));
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
spoutConfig.zkServers=ImmutableList.of("localhost");
spoutConfig.zkPort=2181;
//spoutConfig.scheme=new StringScheme();
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
builder.setSpout("spout",new KafkaSpout(spoutConfig));
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("spout");
Config config = new Config();
cluster.submitTopology("kafka-test", config, builder.createTopology());
Thread.sleep(600000);
}