4
votes

We've have written unit tests for spark, in local mode with 4 threads.

When launched one by one, for example through intellij or sbt testOnly, each test runs fine.

When launched with sbt test, they fail with errors like

[info] java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.execution.datasources.csv.CSVFileFormat not a subtype

We notably upgraded sbt and spark versions to the latest, tried to run with fork in test := true in the build.sbt, but this didn't help.

Spark is in version 2.4.3, sbt 1.2.8 and scala in 2.12.8.

sbt config is nothing special:

libraryDependencies ++= Seq(
  Dependencies.Test.concordion,
  Dependencies.`spark-sql` exclude("org.slf4j","slf4j-log4j12"),
  Dependencies.`better-files`
)

fork in test := true


dependencyOverrides += "com.google.guava" % "guava" % "11.0.2" 
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7.1"

We're using a sbt project with multiple various sub projects, defined this way:

scalacOptions in ThisBuild ++= Seq(
  "-encoding", "UTF-8", // source files are in UTF-8
  "-deprecation", // warn about use of deprecated APIs
  "-Yrangepos", // use range positions for syntax trees
  "-language:postfixOps", //  enables postfix operators
  "-language:implicitConversions", // enables defining implicit methods and members
  "-language:existentials", // enables writing existential types
  "-language:reflectiveCalls", // enables reflection
  "-language:higherKinds", // allow higher kinded types without `import scala.language.higherKinds`
  "-unchecked", // warn about unchecked type parameters
  "-feature", // warn about misused language features
  /*"-Xlint",               // enable handy linter warnings
    "-Xfatal-warnings",     // turn compiler warnings into errors*/
  "-Ypartial-unification" // allow the compiler to unify type constructors of different arities
)

autoCompilerPlugins := true

addCompilerPlugin(Dependencies.`kind-projector`)
addCompilerPlugin(Dependencies.`better-monadic-for`)


// Define the root project, and make it compile all child projects
lazy val `datarepo` =
  project
    .in(file("."))
    .aggregate(
      `foo`,
      `foo-other`,
      `sparkusingproject`,
      `sparkusingproject-test`,
      `sparkusingproject-other`,
    )

// Define individual projects, the directories they reside in, and other projects they depend on
lazy val `foo` =
  project
    .in(file("foo"))
    .settings(Common.defaultSettings: _*)

lazy val `foo-other` =
  project
    .in(file("foo-other"))
    .dependsOn(`foo`)
    .settings(Common.defaultSettings: _*)

1
Can you provide more details regarding code.Nikk
@Nikk: i added more details, it's better for you?space borg

1 Answers

1
votes

I just hit this exception in a test, and it was caused by trying to run a Spark action in a thread that was different from the thread where I started the SparkSession. You might want to disable parallelExecution in Test (this is recommended for Spark integration tests anyway).

Specifically, I was trying to execute multiple Spark actions in parallel, and I tried doing that in Scala's ExecutionContext.global thread pool. When I created a newFixedPoolExecutor instead, everything started working fine.

AFAICT this is because in DataSource.scala:610, Spark gets the thread's ContextClassLoader:

    val loader = Utils.getContextOrSparkClassLoader

and, when running in Scala's default thread pool, the class loader does not contain the relevant classes and interfaces. When you create a new thread pool instead, it inherits the correct class loader from the current thread and works fine afterwards.