0
votes

I am trying to connect DB2 database in the spark streaming application and the database query execution statement causing "org.apache.spark.SparkException: Task not serializable" issues. Please advise. Below is the sample code I have for reference.

        dataLines.foreachRDD{rdd=>
          val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)

          val dataRows=rdd.map(rs => rs.value).map(row =>
            row.split(",")(1)-> (row.split(",")(0), row.split(",")(1), row.split(",")(2)
              , "cvflds_"+row.split(",")(3).toLowerCase, row.split(",")(4), row.split(",")(5), row.split(",")(6))
          )

          val db2Conn = getDB2Connection(spark,db2ConParams)

          dataRows.foreach{ case (k,v) =>
              val table = v._4
              val dbQuery = s"(SELECT * FROM $table ) tblResult"
              val df=getTableData(db2Conn,dbQuery)
              df.show(2)
          }
        }


Below is other function code:

  private def getDB2Connection(spark: SparkSession, db2ConParams:scala.collection.immutable.Map[String,String]): DataFrameReader = {
      spark.read.format("jdbc").options(db2ConParams)
  }

  private def getTableData(db2Con: DataFrameReader,tableName: String):DataFrame ={
      db2Con.option("dbtable",tableName).load()
  }



object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

Below is the error log:

2018-03-28 22:12:21,487 [JobScheduler] ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1522289540000 ms.0 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreach(RDD.scala:915) at ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1.apply(DB2DataLoadToKudu.scala:139) at ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1.apply(DB2DataLoadToKudu.scala:128) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.apache.spark.sql.DataFrameReader Serialization stack: - object not serializable (class: org.apache.spark.sql.DataFrameReader, value: org.apache.spark.sql.DataFrameReader@15fdb01) - field (class: ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1$$anonfun$apply$2, name: db2Conn$1, type: class org.apache.spark.sql.DataFrameReader) - object (class ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1$$anonfun$apply$2, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 30 more

1
Please post the full text of the error message, including stack trace.Ken Y-N
Hi Ken, added the error log. Please check.Maruti K
Can you just try with moving the spark session creation line out of dataLines.foreachRDD ?Vinod Chandak
Hey Vinod, moving spark session out of foreachRDD doesn't solve my problem. I have to query the database for each and every message I receive in the foreach loop.Maruti K
Another guess is to make the methods as functions.Can you check stackoverflow.com/questions/22592811/…Vinod Chandak

1 Answers

1
votes

Ideally you should keep the closure in dataRows.foreach clear of any connection objects, since the closure is meant to be serialized to executors and run there. This concept is covered in depth @ this official link

In your case below line is the closure that is causing the issue:

val df=getTableData(db2Conn,dbQuery)

So, instead of using spark to get the DB2 table loaded, which in your case becomes(after combining the methods):

spark.read.format("jdbc").options(db2ConParams).option("dbtable",tableName).load()

Use plain JDBC in the closure to achieve this. You can use db2ConParams in the jdbc code. (I assume its simple enough to be serializable). The link also suggests using rdd.foreachPartition and ConnectionPool to further optimize.

You have not mentioned what you are doing with the table data except df.show(2). If the rows are huge, then you may discuss more about your use case. Perhaps, you need to consider a different design then.