I am trying to connect DB2 database in the spark streaming application and the database query execution statement causing "org.apache.spark.SparkException: Task not serializable" issues. Please advise. Below is the sample code I have for reference.
dataLines.foreachRDD{rdd=>
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
val dataRows=rdd.map(rs => rs.value).map(row =>
row.split(",")(1)-> (row.split(",")(0), row.split(",")(1), row.split(",")(2)
, "cvflds_"+row.split(",")(3).toLowerCase, row.split(",")(4), row.split(",")(5), row.split(",")(6))
)
val db2Conn = getDB2Connection(spark,db2ConParams)
dataRows.foreach{ case (k,v) =>
val table = v._4
val dbQuery = s"(SELECT * FROM $table ) tblResult"
val df=getTableData(db2Conn,dbQuery)
df.show(2)
}
}
Below is other function code:
private def getDB2Connection(spark: SparkSession, db2ConParams:scala.collection.immutable.Map[String,String]): DataFrameReader = {
spark.read.format("jdbc").options(db2ConParams)
}
private def getTableData(db2Con: DataFrameReader,tableName: String):DataFrame ={
db2Con.option("dbtable",tableName).load()
}
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
Below is the error log:
2018-03-28 22:12:21,487 [JobScheduler] ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1522289540000 ms.0 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreach(RDD.scala:915) at ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1.apply(DB2DataLoadToKudu.scala:139) at ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1.apply(DB2DataLoadToKudu.scala:128) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.apache.spark.sql.DataFrameReader Serialization stack: - object not serializable (class: org.apache.spark.sql.DataFrameReader, value: org.apache.spark.sql.DataFrameReader@15fdb01) - field (class: ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1$$anonfun$apply$2, name: db2Conn$1, type: class org.apache.spark.sql.DataFrameReader) - object (class ncc.org.civil.receiver.DB2DataLoadToKudu$$anonfun$createSparkContext$1$$anonfun$apply$2, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 30 more