I try to write dataframe to ignite using jdbc ,
The Spark version is : 2.1
Ignite version:2.3
JDK:1.8
Scala:2.11.8
this is my code snippet:
def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {
val conn = DataSource.conn
var psmt:PreparedStatement = null
try {
OperationIgniteUtil.deleteIgniteData(conn,targetTable)
hiveDF.foreachPartition({
partitionOfRecords => {
partitionOfRecords.foreach(
row => for ( i <- 0 until row.length ) {
psmt = OperationIgniteUtil.getInsertStatement(conn, targetTable, hiveDF.schema)
psmt.setObject(i+1, row.get(i))
psmt.execute()
}
)
}
})
}catch {
case e: Exception => e.printStackTrace()
} finally {
conn.close
}
}
and then I run on spark ,it print erro message:
org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2305) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2305) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2305) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2304) at com.pingan.pilot.ignite.common.OperationIgniteUtil$.WriteToIgnite(OperationIgniteUtil.scala:72) at com.pingan.pilot.ignite.etl.HdfsToIgnite$.main(HdfsToIgnite.scala:36) at com.pingan.pilot.ignite.etl.HdfsToIgnite.main(HdfsToIgnite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.ignite.internal.jdbc2.JdbcConnection Serialization stack: - object not serializable (class: org.apache.ignite.internal.jdbc2.JdbcConnection, value: org.apache.ignite.internal.jdbc2.JdbcConnection@7ebc2975) - field (class: com.pingan.pilot.ignite.common.OperationIgniteUtil$$anonfun$WriteToIgnite$1, name: conn$1, type: interface java.sql.Connection) - object (class com.pingan.pilot.ignite.common.OperationIgniteUtil$$anonfun$WriteToIgnite$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 27 more
Anyone konws I to fix it? Thanks!