2
votes

I want to run Spark Job on Spark Jobserver. During execution, I got an exception:

stack:

java.lang.RuntimeException: scala.ScalaReflectionException: class com.some.example.instrument.data.SQLMapping in JavaMirror with org.apache.spark.util.MutableURLClassLoader@55b699ef of type class org.apache.spark.util.MutableURLClassLoader with classpath [file:/app/spark-job-server.jar] and parent being sun.misc.Launcher$AppClassLoader@2e817b38 of type class sun.misc.Launcher$AppClassLoader with classpath [.../classpath jars/] not found.

at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22) at com.some.example.instrument.DataRetriever$$anonfun$combineMappings$1$$typecreator15$1.apply(DataRetriever.scala:136) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:49) at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33) at com.some.example.instrument.DataRetriever$$anonfun$combineMappings$1.apply(DataRetriever.scala:136) at com.some.example.instrument.DataRetriever$$anonfun$combineMappings$1.apply(DataRetriever.scala:135) at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) at scala.util.Try$.apply(Try.scala:192) at scala.util.Success.map(Try.scala:237) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

In DataRetriever I convert simple case class to DataSet.

case class definition:

case class SQLMapping(id: String,
                      it: InstrumentPrivateKey,
                      cc: Option[String],
                      ri: Option[SourceInstrumentId],
                      p: Option[SourceInstrumentId],
                      m: Option[SourceInstrumentId])

case class SourceInstrumentId(instrumentId: Long,
                              providerId: String)

case class InstrumentPrivateKey(instrumentId: Long,
                                providerId: String,
                                clientId: String)

code that causes a problem:

import session.implicits._
def someFunc(future: Future[ID]): Dataset[SQLMappins] = {
future.map {f =>
val seq: Seq[SQLMapping] = getFromEndpoint(f)
val ds: Dataset[SQLMapping] = seq.toDS()
...
 }
}

The job sometimes works, but if I re-run job, it will throw an exception.

update 28.03.2018 I forgot to mention one detail, that turns out to be important. Dataset was constructed inside of Future.

1

1 Answers

1
votes

Calling toDS() inside future causing ScalaReflectionException.

I decided to construct DataSet outside future.map.

You can verify that Dataset can't be constructed in future.map with this example job.

package com.example.sparkapplications

import com.typesafe.config.Config
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import spark.jobserver.SparkJob
import spark.jobserver.SparkJobValid
import spark.jobserver.SparkJobValidation

object FutureJob extends SparkJob{
  override def runJob(sc: SparkContext,
                      jobConfig: Config): Any = {
    val session = SparkSession.builder().config(sc.getConf).getOrCreate()
    import session.implicits._
    val f = Future{
      val seq = Seq(
        Dummy("1", 1),
        Dummy("2", 2),
        Dummy("3", 3),
        Dummy("4", 4),
        Dummy("5", 5)
      )

      val ds = seq.toDS

      ds.collect()
    }

    Await.result(f, 10 seconds)
  }

  case class Dummy(id: String, value: Long)
  override def validate(sc: SparkContext,
                        config: Config): SparkJobValidation = SparkJobValid
}

Later I will provide information if the problem persists using spark 2.3.0, and when you pass jar via spark-submit directly.