10
votes

I’m testing a Spark Streaming application with the help of "com.holdenkarau.spark-testing-base" and scalatest.

import com.holdenkarau.spark.testing.StreamingSuiteBase
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfter, FunSuite }

class Test extends FunSuite with BeforeAndAfter with StreamingSuiteBase {

  var delim: String = ","

  before {
    System.clearProperty("spark.driver.port")
   }

  test(“This Fails“) {

    val source = scala.io.Source.fromURL(getClass.getResource(“/some_logs.csv"))
    val input = source.getLines.toList

    val rowRDDOut = Calculator.do(sc.parallelize(input))   //Returns DataFrame

    val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + delim + row.getAs[String](1))

    source.close
  }
}

I get Serialization exception for field 'delim':

org.apache.spark.SparkException: Task not serializable
[info]   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
[info]   at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
[info]   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
[info]   at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
[info]   at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
[info]   at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
[info]   at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
[info]   at org.apache.spark.rdd.RDD.map(RDD.scala:323)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
[info] Serialization stack:
[info]  - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@78b339fa)
[info]  - field (class: org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper)

If I replace 'delim' by String value in place, it works fine.

val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + “,” + row.getAs[String](1))

What’s the difference between first and second version?

Thanks in advance!

2

2 Answers

19
votes

The problem is not the type of delim (String) it's delim itself.

Try not to define variables outside your test() methods. If you define delm inside your test it should work.

test(“This Fails“) {
   val delim = ","
   ...
}

Now, you may ask why? Well, when you reference delim from the outer scope, Scala will try to bring together the enclosing object class Test. This object contains a reference to org.scalatest.Assertions$AssertionsHelper that it's not Serializable (see your stacktrace).

0
votes

I ran into this today, and the error persisted even after I moved all my code inside the test as mentioned in the accepted answer.

Finally, found out that I was using wrong syntax in the code (which the compiler did not catch). In my case it was something like:

// Wrong
df.filter(x => x.id === y)

// Right
df.filter(x => x.id == y)