1
votes

My spark application fails to run on AWS EMR cluster. I noticed that this is because some classes are loaded from the path set by EMR and not from my application jar. For example

java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
        at com.sksamuel.avro4s.SchemaFor$.fieldBuilder(SchemaFor.scala:424)
        at com.sksamuel.avro4s.SchemaFor$.fieldBuilder(SchemaFor.scala:406)

Here org.apache.avro.Schema is loaded from "jar:file:/usr/lib/spark/jars/avro-1.7.7.jar!/org/apache/avro/Schema.class"

Whereas com.sksamuel.avro4s depends on avro 1.8.1. My application is built as a fat jar and has avro 1.8.1. Why isn't that loaded? Instead of picking 1.7.7 from EMR set classpath.

This is just an example. I see the same with other libraries I include in my application. May be Spark depends on 1.7.7 and I'd have to shade when including other dependencies. But why are the classes included in my app jar not loaded first?

2

2 Answers

1
votes

After bit of reading I realized that this is how class loading works in Spark. There is a hook to change this behavior spark.executor.userClassPathFirst. It didn't quite work when I tried and its marked as experimental. I guess the best way to proceed is to shade dependencies. Given the number of libraries Spark and its components pull, this might be quite a lot shading with complicated Spark apps.

1
votes

I had the same exception as you. Based on a recommendation, I was able to resolve this exception by shading the avro dependency as you suggested:

assemblyShadeRules in assembly := Seq( ShadeRule.rename("org.apache.avro.**" -> "latest_avro.@1").inAll )

If it helps, here is my full build.sbt (project info aside):

val sparkVersion = "2.1.0"
val confluentVersion = "3.2.1"

resolvers += "Confluent" at "http://packages.confluent.io/maven"

libraryDependencies ++= Seq(
  "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
  "org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided",
  "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
  "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" excludeAll ExclusionRule(organization = "org.scala-lang"),
  "org.apache.avro" % "avro" % "1.8.1" % "provided",
  "com.databricks" %% "spark-avro" % "3.2.0",
  "com.sksamuel.avro4s" %% "avro4s-core" % "1.6.4",
  "io.confluent" % "kafka-avro-serializer" % confluentVersion
)

logBuffered in Test := false

assemblyShadeRules in assembly := Seq(
  ShadeRule.rename("shapeless.**" -> "new_shapeless.@1").inAll,
  ShadeRule.rename("org.apache.avro.**" -> "latest_avro.@1").inAll
)

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}