2
votes

I am tring to run the spark streaming example : Directkafkawordcount.scala

To create jar I am using "build.sbt" with plugin:

     name := "Kafka Direct"

version := "1.0"

scalaVersion := "2.11.6"

libraryDependencies ++= Seq ("org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0",
"org.apache.spark" % "spark-streaming_2.11" % "2.1.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0"  exclude("com.esotericsoftware.minlog", "minlog")
)
resolvers ++= Seq(
    "Maven Central" at "https://repo1.maven.org/maven2/"
)



 mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")          =>     MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")      =>     MergeStrategy.discard
  case "log4j.properties"                                  =>     MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") =>     MergeStrategy.filterDistinctLines
  case "reference.conf"                                    =>     MergeStrategy.concat
  case _                                                   =>     MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith "pom.properties" =>  MergeStrategy.discard
  case x => val oldStrategy = (assemblyMergeStrategy in assembly).value
  oldStrategy(x)
}

"sbt package" is successful but when I want to submit with from target/scala-2.11/classes directory: "spark-submit --class org.apache.spark.examples.streaming.DirectKafkaWordCount --master local[2] /home/hadoop/DirectKafkaProject/target/scala-2.11/kafka-direct_2.11-1.0.jar localhost:9092 xyz123"

it give me this error:

Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

I already set SPARK_CLASSPATH and SPARK_LOCAL_IP. I already try with -jars option but it is asking for another .jar file and it keep asking for other .jar files. I had done every thing what every this site suggested but I am no able to solve my problem. Scala version: 2.11.6 spark version: 2.1.0 kafka version: 2.11-0.10.2.0 .

Please help me . Thanks.

2
You need to add Kafka jar, Kafa utils, metrics core jar to spark classpath.ROOT
My spark classpath is set with spark's jars and kafka jar's.Angshusuri
I try with this "spark-submit --class org.apache.spark.examples.streaming.DirectKafkaWordCount --jars /home/hadoop/jars/spark-streaming-kafka-0-8_2.11-2.1.0.jar,/home/hadoop/jars/kafka-clients-0.10.2.0.jar,/home/hadoop/jars/metrics-core-2.2.0.jar,/home/hadoop/jars/kafka_2.11-0.10.2.0.jar --master local[2] /home/hadoop/DirectKafkaProject/target/scala-2.11/kafka-direct_2.11-1.0.jar localhost:9092 MyTest1" . But now it is showing "Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker ". So, please help me. Thanks.Angshusuri
but last command showing me "Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker" this exception. thanks .Angshusuri
Are you sure that kafka on your machine is up and running? and once try to add spark-kafka-streaming jar to both spark and kafka classpath and try again.ROOT

2 Answers

2
votes

tl;dr sbt assembly and spark-submit

sbt assembly assembles all the dependencies in one single jar file and given you use the external library for Kafka that's what I'd recommend.

And you've got the definition for sbt-assembly plugin in build.sbt already that suggests it's the way to go.


Executing assembled Spark application is using spark-submit command-line application as follows:

bin/spark-submit \
  --class org.apache.spark.examples.streaming.DirectKafkaWordCount \
  --master local[2] \
  target/scala-2.11/StreamKafkaPr‌​og-assembly-1.0.jar localhost:9092 rat123
1
votes

Just in case someone stumbles on this post (like I did). The answer by Jacek is very much correct. For me it still was not working, because the merging strategy is crucial (and I used some examples from the internet). The most simplest form (assuming you only have spark and spark-sql-kafka etc.) that works for me is

assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x => (assemblyMergeStrategy in assembly).value(x)
}