3
votes

There are quite a few answers on this topic but nothing was working.

I am trying to execute the following streams processor.

object simplestream extends App {

    val builder: KStreamBuilder = new KStreamBuilder

    val streamingConfig = { //ToDo - Move these to config
      val settings = new Properties
      settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "example11")
      settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      // Specify default (de)serializers for record keys and for record values.
      settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
      settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
      settings
    }

    val users = builder.stream("tt2")

    users.print()
    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.start()

  }
}

Dependencies:

   //kafka
  "org.apache.kafka" % "kafka-streams" % "0.10.2.0",
  "org.apache.kafka" % "kafka-clients" % "0.10.2.0"

And the error:

  [error] (run-main-1) org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
    at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38)
    at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:323)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:349)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:272)
    at kafka.simplestream$.runStream(simplestream.scala:36)
    at kafka.simplestream$.delayedEndpoint$kafka$simplestream$1(simplestream.scala:40)
    at kafka.simplestream$delayedInit$body.apply(simplestream.scala:12)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:378)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at kafka.simplestream$.main(simplestream.scala:12)
    at kafka.simplestream.main(simplestream.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.Metadata.update(Lorg/apache/kafka/common/Cluster;J)V

I've tried different client versions, no luck. I am using kafka 0.10.2.0 version. I also get below error in zookeeper.

[2017-08-18 13:08:10,260] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:delete cxid:0x29 zxid:0x4d txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-08-18 13:08:10,364] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:create cxid:0x35 zxid:0x4e txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-08-18 13:08:10,364] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:create cxid:0x36 zxid:0x4f txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)

Not sure what is exactly causing it. I am able to consumer/produce just fine though.

1

1 Answers

5
votes

java.lang.NoSuchMethodError - This error happens when multiple versions of the client jar is available in your classpath. Check your classpath once.

The KeeperException thrown by the zookeeper is not an issue, it's just creating the nodes / folders which doesn't exists in the Zookeeper.