I try to use Confluent platform and make high-level Kafka requests to the REST end-point using this code as an example.
I use the following Kafka parameters:
val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"schema.registry.url" -> "http://localhost:8081",
"group.id" -> "EventConsumer",
"auto.offset.reset" -> "smallest"
)
This is the error I get when I try to run the code. The error occurs at the line:
@transient val kafkaStream: DStream[(String, Object)] =
KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](
ssc, kafkaParams, Set(topic)
)
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException org.apache.spark.SparkException: Couldn't find leader offsets for Set([test-topic,0]) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:98) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at kafka.EventsConsumer$.delayedEndpoint$kafka$EventsConsumer$1(EventsConsumer.scala:53) at kafka.EventsConsumer$delayedInit$body.apply(EventsConsumer.scala:22) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at kafka.EventsConsumer$.main(EventsConsumer.scala:22) at kafka.EventsConsumer.main(EventsConsumer.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
UPDATE:
I tried to change localhost to IP, but still get the same problem.