4
votes

I am using Apache Kafka 0.8.2.1 to stream web events to other datasources. The Kafka Producer I wrote is working great and I'm able to see the data getting streaming through my topic when I run kafka-console-consumer.sh. However, I have had no luck whatsoever trying to get my Kafka Consumer to retrieve the messages. Any ideas?

The following error about an improper path is being output when my code attempts to run consumer.createMessageStreams(topicCountMap)

Exception in thread "main" java.lang.IllegalArgumentException: Path must not end with / character
        at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:58)
        at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1024)
        at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1073)
        at org.I0Itec.zkclient.ZkConnection.exists(ZkConnection.java:95)
        at org.I0Itec.zkclient.ZkClient$11.call(ZkClient.java:827)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:824)
        at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:136)
        at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:901)
        at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:898)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:
898)
        at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:240)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)

Here is the code from my Kafka Consumer.

  val consumer: ConsumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig())

  var executor: ExecutorService = null

  def run(a_numThreads: Integer) {
    var topicCountMap: java.util.Map[String, Integer] = new java.util.HashMap[String, Integer]()

    topicCountMap.put("testEvent", new Integer(a_numThreads))

    var consumerMap = consumer.createMessageStreams(topicCountMap)

    var streams = consumerMap.get("testEvent")
    // now launch all the threads
    executor = Executors.newFixedThreadPool(a_numThreads)

    // now create an object to consume the messages
    //
    var threadNumber: Integer = 0
    var streamsItr = streams.iterator()
    while (streamsItr.hasNext()) {
      var stream = streamsItr.next()
      executor.submit(new EventConsumer(stream, threadNumber))
      threadNumber = threadNumber + 1
    }
  }

  def createConsumerConfig(): ConsumerConfig = {
    var props: Properties = new Properties()
    props.put("zookeeper.connect", "127.0.0.1:2181")
    props.put("zk.connect", "127.0.0.1:2181")
    props.put("group.id", "testConsumer")
    props.put("groupid", "tesConsumer")
    props.put("zookeeper.session.timeout.ms", "400")
    props.put("zookeeper.sync.time.ms", "200")
    props.put("auto.commit.interval.ms", "1000")

    return new ConsumerConfig(props)
  }
1
This might not be the actual issue but why do you require both zookeeper.connect & zk.connect ? Also group.id & groupid ?user2720864
At some point, the Kafka team changed the configuration property that the consumer uses and haven't necessarily updated the documentation properly. Unfortunately, I don't know which one it's using so I've included both of them.Kevin Quon
Starting from 0.8 you require zookeeper.connect & group.id .. Related to the exception this looks some thing to do with zookeeper but your configuration seems okay to me .. You could probably dig in and get some more details if possible .. Hope that could lead us to some clueuser2720864

1 Answers

1
votes

Spark CheckpointWriter produces this exception message when it cannot get access to checkpoint path stored. Please be ensure that Checkpointing is disabled or provide correct path. As exception occurs after success connection in

at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:58)

it seems that writer cannot get acces to directory to which the checkpoint information will be saved.

https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing