0
votes

Background: I am using a Spark Streaming program in Scala with Kafka. My intention is to read a file to Kafka and publish these messages to the Spark Streaming application to do some analysis.

Issue However, when I pipe the file to Kafka and start my Streaming application to listen to the particular topic, I see these error messages on my Kafka producer console.

Command used to read the file:

C:\Kafka\bin\windows>kafka-console-producer --broker-list localhost:9092 --topic mytopic2 < C:\somefile.csv

ERROR:

[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0

I am running this application on my Windows machine locally and the Kafka server also runs locally on my machine.

The Spark application looks something like:

val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")        
val topics = List("mytopic2").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)     

lines.foreachRDD((rdd, time) => {
// do something
}

I am not sure what the error with respect to Kafka/Spark actually means.

Any pointers to proceed would be appreciated.

2

2 Answers

0
votes

The error has got nothing to do with Spark/Spark Streaming. It looks like there is some problem with your Kafka Setup.

The timeout error generally occurs when there is some issue with the Zookeeper setup. Have you configured your zookeeper correctly ? Make sure that is correctly setup. Also, try running simple Kafka producer & consumer scripts first which comes with Kafka.

0
votes

This was an issue with Kafka after all. I suspect this had more to do with the Kafka version I had downloaded for use with Spark Streaming and less to do with Kafka setup per se.

I had downloaded Kafka 0.10.0.x for Spark Streaming 1.6.2 -> This was when I was getting the TimeOut error. I found this link: https://spark.apache.org/docs/1.6.2/streaming-programming-guide.html#linking, which states that: " Kafka: Spark Streaming 1.6.2 is compatible with Kafka 0.8.2.1. ".

So, when I downloaded 0.8.2.1, it worked fine - I don't get the "TimeOut Errors" anymore.