There are quite a few answers on this topic but nothing was working.
I am trying to execute the following streams processor.
object simplestream extends App {
val builder: KStreamBuilder = new KStreamBuilder
val streamingConfig = { //ToDo - Move these to config
val settings = new Properties
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "example11")
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// Specify default (de)serializers for record keys and for record values.
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
settings
}
val users = builder.stream("tt2")
users.print()
val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
stream.start()
}
}
Dependencies:
//kafka
"org.apache.kafka" % "kafka-streams" % "0.10.2.0",
"org.apache.kafka" % "kafka-clients" % "0.10.2.0"
And the error:
[error] (run-main-1) org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38)
at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:323)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:349)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:272)
at kafka.simplestream$.runStream(simplestream.scala:36)
at kafka.simplestream$.delayedEndpoint$kafka$simplestream$1(simplestream.scala:40)
at kafka.simplestream$delayedInit$body.apply(simplestream.scala:12)
at scala.Function0.apply$mcV$sp(Function0.scala:34)
at scala.Function0.apply$mcV$sp$(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App.$anonfun$main$1$adapted(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:378)
at scala.App.main(App.scala:76)
at scala.App.main$(App.scala:74)
at kafka.simplestream$.main(simplestream.scala:12)
at kafka.simplestream.main(simplestream.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.Metadata.update(Lorg/apache/kafka/common/Cluster;J)V
I've tried different client versions, no luck. I am using kafka 0.10.2.0 version. I also get below error in zookeeper.
[2017-08-18 13:08:10,260] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:delete cxid:0x29 zxid:0x4d txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-08-18 13:08:10,364] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:create cxid:0x35 zxid:0x4e txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-08-18 13:08:10,364] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:create cxid:0x36 zxid:0x4f txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
Not sure what is exactly causing it. I am able to consumer/produce just fine though.