0
votes

I have a following task ahead of me.

User provides set of IP addresses a config file while executing spark submit command.

Lets say that array looks like this :

val ips = Array(1,2,3,4,5)

There can be up to 100.000 values in array..

For all elements in array, I should read data for Cassandra, perform some computation and insert data back to Cassandra.

If I do:

ips.foreach(ip =>{
- read data from Casandra for specific "ip" // for each IP there is different amount of data to read (within the functions I determine start and end date for each IP)
- process it
- save it back to Cassandra})

this works fine.

I believe that process runs sequentially; I don't exploit parallelism.

On the other hand if I do:

val IPRdd = sc.parallelize(Array(1,2,3,4,5))
IPRdd.foreach(ip => {
- read data from Cassandra // I need to use spark context to make the query
-process it
save it back to Cassandra})

I get serialization exception, because spark is trying to serialize spark context, which is not serializable.

How to make this work, but still exploit parallelism.

Thanks

Edited

This is the execption I get:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob$$anonfun$main$1.apply(WibeeeBatchJob.scala:59) at com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob$$anonfun$main$1.apply(WibeeeBatchJob.scala:54) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob$.main(WibeeeBatchJob.scala:54) at com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob.main(WibeeeBatchJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@311ff287) - field (class: com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob$$anonfun$main$1, name: sc$1, type: class org.apache.spark.SparkContext) - object (class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob$$anonfun$main$1, ) - field (class: com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob$$anonfun$main$1$$anonfun$apply$1, name: $outer, type: class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob$$anonfun$main$1) - object (class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob$$anonfun$main$1$$anonfun$apply$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

1
What's the specific exception and the specific code causing that ?Brian Agnew

1 Answers

1
votes

Easiest thing to do is to use the Spark Cassandra Connector which can handle connection pooling and serialization.

With that you could do something like

sc.parallelize(inputData, numTasks)
  .mapPartitions {  it =>
    val con = CassandraConnection(yourConf)
    con.withSessionDo{ session =>
      //Use the session
    }
    //Do any other processing
  }.saveToCassandra("ks","table"

This would be completely manual operation of a Cassandra Connection. The sessions would all be automatically pooled and cached and if you prepare a statement those will be cached on the executor as well.

If you would like to use more built in methods, there also exists joinWithCassandraTable which may work in your situation.

sc.parallelize(inputData, numTasks)
  .joinWithCassandraTable("ks","table") //Retrieves all records for which input data is the primary key
  .map( //manipulate returned results if needed )
  .saveToCassandra("ks","table")