7
votes

I am using IntelliJ Community Edition with Scala Plugin and spark libraries. I am still learning Spark and am using Scala Worksheet.

I have written the below code which removes punctuation marks in a String:

def removePunctuation(text: String): String = {
  val punctPattern = "[^a-zA-Z0-9\\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase
}

Then I read a text file and try to remove punctuation:

val myfile = sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)

This gives error as below, any help would be appreciated:

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(/home/ubuntu/src/main/scala/Test.sc:294) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(/home/ubuntu/src/main/scala/Test.sc:284) at org.apache.spark.util.ClosureCleaner$.clean(/home/ubuntu/src/main/scala/Test.sc:104) at org.apache.spark.SparkContext.clean(/home/ubuntu/src/main/scala/Test.sc:2090) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(/home/ubuntu/src/main/scala/Test.sc:366) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(/home/ubuntu/src/main/scala/Test.sc:365) at org.apache.spark.rdd.RDDOperationScope$.withScope(/home/ubuntu/src/main/scala/Test.sc:147) at #worksheet#.#worksheet#(/home/ubuntu/src/main/scala/Test.sc:108) Caused by: java.io.NotSerializableException: A$A21$A$A21 Serialization stack: - object not serializable (class: A$A21$A$A21, value: A$A21$A$A21@62db3891) - field (class: A$A21$A$A21$$anonfun$words$1, name: $outer, type: class A$A21$A$A21) - object (class A$A21$A$A21$$anonfun$words$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at A$A21$A$A21.words$lzycompute(Test.sc:27) at A$A21$A$A21.words(Test.sc:27) at A$A21$A$A21.get$$instance$$words(Test.sc:27) at A$A21$.main(Test.sc:73) at A$A21.main(Test.sc) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.jetbrains.plugins.scala.worksheet.MyWorksheetRunner.main(MyWorksheetRunner.java:22)

3

3 Answers

6
votes

As @TGaweda suggests, Spark's SerializationDebugger is very helpful for identifying "the serialization path leading from the given object to the problematic object." All the dollar signs before the "Serialization stack" in the stack trace indicate that the container object for your method is the problem.

While it is easiest to just slap Serializable on your container class, I prefer to take advantage of the fact Scala is a functional language and use your function as a first class citizen:

sc.textFile("/home/ubuntu/data.txt",4).map { text =>
  val punctPattern = "[^a-zA-Z0-9\\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase
}

Or if you really want to keep things separate:

val removePunctuation: String => String = (text: String) => {
  val punctPattern = "[^a-zA-Z0-9\\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase
}
sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)

These options work of course since Regex is serializable as you should confirm.

On a secondary but very important note, constructing a Regex is expensive, so factor it out of your transformations for the sake of performance--possibly with a broadcast.

10
votes

As T. Gaweda already pointed out, you're most likely defining your function in a class that's not serializable. Because it is a pure function, i.e. it doesn't depend on any context of the enclosing class, I suggest you put it into a companion object which should extend Serializable. This would be Scala's equivalent of a Java static method:

object Helper extends Serializable {
  def removePunctuation(text: String): String = {
    val punctPattern = "[^a-zA-Z0-9\\s]".r
    punctPattern.replaceAllIn(text, "").toLowerCase
  }
}
3
votes

Read the stacktrace, there is:

$outer, type: class A$A21$A$A21

It is a very good hint. Your lambda is serializable, but your class is not serializable.

When you make lambda expression, then this expression has reference to outer class. Outer class in your case is not serializable, i.e. is not implementing Serializable or one of fields is not an instance of Serializable