2
votes

I have the following code, where the fault is at sc.parallelize()

val pairs = ret.cartesian(ret)
    .map {
        case ((k1, v1), (k2, v2)) => ((k1, k2), (v1.toList, v2.toList))
    }
for (pair <- pairs) {
    val test = sc.parallelize(pair._2._1.map(_._1 ))
}

Where

  • k1, k2 are strings
  • v1, v2 are lists of doubles

I am getting the following error whenever I try to access sc. What am I doing wrong here?

Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.foreach(RDD.scala:868) at CorrelationCalc$.main(CorrelationCalc.scala:33) at CorrelationCalc.main(CorrelationCalc.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@40bee8c5) - field (class: CorrelationCalc$$anonfun$main$1, name: sc$1, type: class org.apache.spark.SparkContext) - object (class CorrelationCalc$$anonfun$main$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 20 more

1
Your first map seems to be a noop. What do you intend to do with the second map?Reactormonk
First map is just to reformat my data, second map is used to make a RDD from a list. It always fails at the second map since I cannot serialize sc.ZhongBot
What is the type of pairs?eliasah

1 Answers

2
votes

The for-comprehension is just doing a pairs.map()

RDD operations are performed by the workers and to have them do that work, anything you send to them must be serializable. The SparkContext is attached to the master: it is responsible for managing the entire cluster.

If you want to create an RDD, you have to be aware of the whole cluster (that's the 2nd "D" --- distributed) so you can't create a new RDD on the workers. And you probably don't want to turn each row in pairs into an RDD (and each with the same name!) anyway.

It's difficult to tell from your code what you'd like to do, but it will probably look like

val test = pairs.map( r => r._2._1) 

which would be an RDD where each row is whatever was in the v1.toList's