1
votes

I try to write dataframe to ignite using jdbc ,

The Spark version is : 2.1

Ignite version:2.3

JDK:1.8

Scala:2.11.8

this is my code snippet:

def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {

  val conn = DataSource.conn
  var psmt:PreparedStatement = null

  try {
    OperationIgniteUtil.deleteIgniteData(conn,targetTable)

    hiveDF.foreachPartition({
      partitionOfRecords => {
        partitionOfRecords.foreach(
          row => for ( i <- 0 until row.length ) {
            psmt = OperationIgniteUtil.getInsertStatement(conn, targetTable, hiveDF.schema)
            psmt.setObject(i+1, row.get(i))
            psmt.execute()
          }
        )
      }
    })

  }catch {
    case e: Exception =>  e.printStackTrace()
  } finally {
    conn.close
  }
}

and then I run on spark ,it print erro message:

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2305) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2305) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2305) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2304) at com.pingan.pilot.ignite.common.OperationIgniteUtil$.WriteToIgnite(OperationIgniteUtil.scala:72) at com.pingan.pilot.ignite.etl.HdfsToIgnite$.main(HdfsToIgnite.scala:36) at com.pingan.pilot.ignite.etl.HdfsToIgnite.main(HdfsToIgnite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.ignite.internal.jdbc2.JdbcConnection Serialization stack: - object not serializable (class: org.apache.ignite.internal.jdbc2.JdbcConnection, value: org.apache.ignite.internal.jdbc2.JdbcConnection@7ebc2975) - field (class: com.pingan.pilot.ignite.common.OperationIgniteUtil$$anonfun$WriteToIgnite$1, name: conn$1, type: interface java.sql.Connection) - object (class com.pingan.pilot.ignite.common.OperationIgniteUtil$$anonfun$WriteToIgnite$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 27 more

Anyone konws I to fix it? Thanks!

2

2 Answers

2
votes

The problem here is you cannot serialize the connection to Ignite DataSource.conn. The closure you provide to forEachPartition contains the connection as part of its scope which is why Spark cannot serialize it.

Fortunately, Ignite provides a custom implementation of RDD which allows you to save values to it. You will need to create an IgniteContext first, then retrieve Ignite's shared RDD which provide distributed access to Ignite to save the Row of your RDD:

val igniteContext = new IgniteContext(sparkContext, () => new IgniteConfiguration())
...

// Retrieve Ignite's shared RDD
val igniteRdd = igniteContext.fromCache("partitioned")
igniteRDD.saveValues(hiveDF.toRDD)

More information are accessible from the Apache Ignite documentation.

0
votes

You have to extend the Serializable interface.

object Test extends Serializable { 
  def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {
   ???
  }
}

I hope it would resolve your problem.