I am using Apache Kafka 0.8.2.1 to stream web events to other datasources. The Kafka Producer I wrote is working great and I'm able to see the data getting streaming through my topic when I run kafka-console-consumer.sh. However, I have had no luck whatsoever trying to get my Kafka Consumer to retrieve the messages. Any ideas?
The following error about an improper path is being output when my code attempts to run consumer.createMessageStreams(topicCountMap)
Exception in thread "main" java.lang.IllegalArgumentException: Path must not end with / character
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:58)
at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1024)
at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1073)
at org.I0Itec.zkclient.ZkConnection.exists(ZkConnection.java:95)
at org.I0Itec.zkclient.ZkClient$11.call(ZkClient.java:827)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:824)
at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:136)
at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:901)
at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:898)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:
898)
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:240)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
Here is the code from my Kafka Consumer.
val consumer: ConsumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig())
var executor: ExecutorService = null
def run(a_numThreads: Integer) {
var topicCountMap: java.util.Map[String, Integer] = new java.util.HashMap[String, Integer]()
topicCountMap.put("testEvent", new Integer(a_numThreads))
var consumerMap = consumer.createMessageStreams(topicCountMap)
var streams = consumerMap.get("testEvent")
// now launch all the threads
executor = Executors.newFixedThreadPool(a_numThreads)
// now create an object to consume the messages
//
var threadNumber: Integer = 0
var streamsItr = streams.iterator()
while (streamsItr.hasNext()) {
var stream = streamsItr.next()
executor.submit(new EventConsumer(stream, threadNumber))
threadNumber = threadNumber + 1
}
}
def createConsumerConfig(): ConsumerConfig = {
var props: Properties = new Properties()
props.put("zookeeper.connect", "127.0.0.1:2181")
props.put("zk.connect", "127.0.0.1:2181")
props.put("group.id", "testConsumer")
props.put("groupid", "tesConsumer")
props.put("zookeeper.session.timeout.ms", "400")
props.put("zookeeper.sync.time.ms", "200")
props.put("auto.commit.interval.ms", "1000")
return new ConsumerConfig(props)
}
zookeeper.connect
&zk.connect
? Alsogroup.id
&groupid
? – user2720864