3
votes

I am trying to rig up a a Kafka-Storm "Hello World" system. I have Kafka installed and running, when I send data with the Kafka producer I can read it with the Kafka console consumer.

I took the Chapter 02 example from the "Getting Started With Storm" O'Reilly book, and modified it to use KafkaSpout instead of a regular spout.

When I run the application, with data already pending in kafka, nextTuple of the KafkaSpout doesn't get any messages - it goes in, tries to iterate over an empty managers list under the coordinator, and exits.

My environment is a fairly old Cloudera VM, with Storm 0.9 and Kafka-Storm-0.9(the latest), and Kafka 2.9.2-0.7.0.

This is how I defined the SpoutConfig and the topology:

String zookeepers = "localhost:2181";

SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"),
        "gtest",
        "/kafka",  // zookeeper root path for offset storing
        "KafkaSpout");
spoutConfig.forceStartOffsetTime(-1);

KafkaSpoutTester kafkaSpout = new KafkaSpoutTester(spoutConfig);


//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", kafkaSpout, 1);
builder.setBolt("word-normalizer", new WordNormalizer())
    .shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),1)
    .fieldsGrouping("word-normalizer", new Fields("word"));

//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());

Can someone please help me figure out why I am not receiving anything?

Thanks, G.

4

4 Answers

4
votes

If you've already consumed the message, it is not supposed read any more, unless your producer produces new messages. It is because of the forceStartOffsetTime call with -1 in your code.

Form storm-contrib documentation:

Another very useful config in the spout is the ability to force the spout to rewind to a previous offset. You do forceStartOffsetTime on the spout config, like so:

   spoutConfig.forceStartOffsetTime(-2);

It will choose the latest offset written around that timestamp to start consuming. You can force the spout to always start from the latest offset by passing in -1, and you can force it to start from the earliest offset by passing in -2.

How you producer looks like? would be useful to have a snippet. You can replace -1 by -2 and see if you receive anything, if your producer is fine then you should be able to consume.

1
votes
SpoutConfig spoutConf = new SpoutConfig(...)
spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
0
votes
SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"),
        "gtest", // name of topic used by producer & consumer
        "/kafka",  // zookeeper root path for offset storing
        "KafkaSpout");

You are using "gtest" topic for receiving the data. Make sure that you are sending data from this topic by producer.

And in the bolt, print that tuple like that

public void execute(Tuple tuple, BasicOutputCollector collector) {
        System.out.println(tuple);
    }

It should print the pending data in kafka.

0
votes

I went through some grief getting storm and Kafka integrated. These are both fast moving and relatively young projects, so it can be hard getting working examples to jump start your development.

To help other developers (and hopefully get others contributing useful examples that I can use as well), I started a github project to house code snippets related to Storm/Kafka (and Esper) development.

You are welcome to check it out here > https://github.com/buildlackey/cep

(click on the storm+kafka directory for a sample program that should get you up and running).