Even with a .cache()d RDD, Spark still seems to serialize the data for each task run. Consider this code:
class LoggingSerializable() extends Externalizable {
override def writeExternal(out: ObjectOutput): Unit = {
println("xxx serializing")
}
override def readExternal(in: ObjectInput): Unit = {
println("xxx deserializing")
}
}
object SparkSer {
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("SparkSer").setMaster("local")
val spark = new SparkContext(conf)
val rdd: RDD[LoggingSerializable] = spark.parallelize(Seq(new LoggingSerializable())).cache()
println("xxx done loading")
rdd.foreach(ConstantClosure)
println("xxx done 1")
rdd.foreach(ConstantClosure)
println("xxx done 2")
spark.stop()
}
}
object ConstantClosure extends (LoggingSerializable => Unit) with Serializable {
def apply(t: LoggingSerializable): Unit = {
println("xxx closure ran")
}
}
It prints
xxx done loading
xxx serializing
xxx deserializing
xxx closure ran
xxx done 1
xxx serializing
xxx deserializing
xxx closure ran
xxx done 2
Even though I called .cache() on rdd, Spark still serializes the data for each call to .foreach. The official docs say
When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it).
and that MEMORY_ONLY means
Store RDD as deserialized Java objects in the JVM.
Note that Spark tries to serialize the data it's when serializing the task, but ConstantClosure does not close over anything, so I don't understand why it would need to serialize any data.
I am asking because I would like to be able to run Spark in local mode without any performance loss, but having to serialize large elements in an RDD for each RDD action can be very costly. I am not sure if this problem is unique to local mode. It seems like Spark can't possibly send the data in an RDD over the wire to workers for every action, even when the RDD is cached.
I'm using spark-core 3.0.0.