2
votes

I'm having problems running spark streaming with kafka on my CDH 5 cluster with the following command:

spark-submit --master yarn --deploy-mode client 
--class org.apache.spark.examples.streaming.KafkaWordCount 
/usr/lib/spark/examples/lib/spark-examples-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar 
zk1,zk2,zk3 group topic 1

Please note that the real job has to run in client mode hence the deploy-mode setting. Executing the above command results in the following exception (driver side):

Exception in thread "main" java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder
at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:66)
at org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
at org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 12 more

kafka.serializer.StringDecoder is of course present in the spark-examples jar. Also placing the jar on hadoop classpath solves the problem, but I'm looking for a better (easier to maintain) solutions or at least some explanation as to why a job cannot find classes that are contained in the same jar it is :)

Any ideas? Thanks!

Some additional information:

  • Other spark examples run just fine (eg. SparkPi)
  • Hadoop version is 2.6.0-cdh5.7.0
  • Spark version is 1.6.0
  • yarn classpath: /etc/hadoop/conf:/etc/hadoop/conf:/etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/.//*:/usr/lib/hadoop-yarn/.//*:/usr/lib/hadoop-yarn/lib/*
1
Did you make sure to include spark-streaming_kafka in your uber JAR? - Yuval Itzchakov
Yes, it is included in the uber jar. Note that the jar was included in the spark distribution and was not made by me. Also putting it on the classpath works so it must contain all the deps. I've tried creating jars with other external deps, such as spray-json, and they work fine. - Michał Wyrwalski

1 Answers

0
votes

It turns out that spark 1.6 requires kafka 0.8.2 and I had 0.8.1 installed. After upgrading it everything worked like a charm :)