Background: I am using a Spark Streaming program in Scala with Kafka. My intention is to read a file to Kafka and publish these messages to the Spark Streaming application to do some analysis.
Issue However, when I pipe the file to Kafka and start my Streaming application to listen to the particular topic, I see these error messages on my Kafka producer console.
Command used to read the file:
C:\Kafka\bin\windows>kafka-console-producer --broker-list localhost:9092 --topic mytopic2 < C:\somefile.csv
ERROR:
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
I am running this application on my Windows machine locally and the Kafka server also runs locally on my machine.
The Spark application looks something like:
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = List("mytopic2").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
lines.foreachRDD((rdd, time) => {
// do something
}
I am not sure what the error with respect to Kafka/Spark actually means.
Any pointers to proceed would be appreciated.