I have met some issues while trying to consume messages from Kafka with a Spark Streaming application in a Kerberized Hadoop cluster. I tried both of the two approaches listed here :
- receiver-based approach :
KafkaUtils.createStream - direct approach (no receivers) :
KafkaUtils.createDirectStream
The receiver-based approach (KafkaUtils.createStream) throws 2 types of exceptions (different exceptions whether I am in local mode (--master local[*]) or in YARN mode (--master yarn --deploy-mode client) :
- a weird
kafka.common.BrokerEndPointNotAvailableExceptionin a Spark local application - a Zookeeper timeout in a Spark on YARN application. I once managed to make this work (connecting to Zookeeper successfully), but no messages were received
In both modes (local or YARN), the direct approach (KafkaUtils.createDirectStream) returns an unexplained EOFException (see details below).
My final goal is to launch a Spark Streaming job on YARN, so I will leave the Spark local job aside.
Here is my test environment :
- Cloudera CDH 5.7.0
- Spark 1.6.0
- Kafka 0.10.1.0
I'm working on a single-node cluster (hostname = quickstart.cloudera) for testing purposes. For those interested to reproduce the tests, I'm working on a custom Docker container based on cloudera/quickstart (Git repo).
Below is my sample code I used in a spark-shell. Of course this code works when Kerberos is not enabled : messages produced by kafka-console-producer are received by the Spark application.
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import kafka.serializer.StringDecoder
val ssc = new StreamingContext(sc, Seconds(5))
val topics = Map("test-kafka" -> 1)
def readFromKafkaReceiver(): Unit = {
val kafkaParams = Map(
"zookeeper.connect" -> "quickstart.cloudera:2181",
"group.id" -> "gid1",
"client.id" -> "cid1",
"zookeeper.session.timeout.ms" -> "5000",
"zookeeper.connection.timeout.ms" -> "5000"
)
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY_2)
stream.print
}
def readFromKafkaDirectStream(): Unit = {
val kafkaDirectParams = Map(
"bootstrap.servers" -> "quickstart.cloudera:9092",
"group.id" -> "gid1",
"client.id" -> "cid1"
)
val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams, topics.map(_._1).toSet)
directStream.print
}
readFromKafkaReceiver() // or readFromKafkaDirectStream()
ssc.start
Thread.sleep(20000)
ssc.stop(stopSparkContext = false, stopGracefully = true)
With Kerberos enabled, this code does not work. I followed this guide : Configuring Kafka Security, and created two configuration files :
jaas.conf :
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
client.properties :
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
I can produce messages with :
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
kafka-console-producer \
--broker-list quickstart.cloudera:9092 \
--topic test-kafka \
--producer.config client.properties
But I can't consume those messages from a Spark Streaming application. To launch spark-shell in yarn-client mode, I just created a new JAAS configuration (jaas_with_zk_yarn.conf), with a Zookeeper section (Client), and with the reference to the keytab being only the name of the file (the keytab is then passed through --keytab option) :
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
This new file is passed in --files option :
spark-shell --master yarn --deploy-mode client \
--num-executors 2 \
--files /home/simpleuser/jaas_with_zk_yarn.conf \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
--keytab /home/simpleuser/simpleuser.keytab \
--principal simpleuser
I used the same code as previously, except that I added two other Kafka parameters, corresponding to the contents of consumer.properties file :
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka"
readFromKafkaReceiver() throws the following error once Spark Streaming Context is started (cannot connect to Zookeeper) :
ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:191)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:139)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:156)
at kafka.consumer.Consumer$.create(ConsumerConnector.scala:109)
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Sometimes the connection to ZK is established (no timeout reached), but then no messages are received.
readFromKafkaDirectStream() throws the following error as soon as this method is called :
org.apache.spark.SparkException: java.io.EOFException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.readFromKafkaDirectStream(<console>:47)
There is no more explanation, just an EOFException. I presume there are communication problems between Spark and Kafka broker, but no more explanations. I also tried metadata.broker.list instead of bootstrap.servers, but without success.
Maybe I'm missing something in the JAAS config files, or in Kafka parameters ? Maybe the Spark options (extraJavaOptions) are invalid ? I tried so much possibilities I'm a little bit lost.
I'll be glad if someone could help me to fix at least one of these problems (direct approach or receiver-based). Thanks :)