I'm learning Spark recently, got confused about checkpoint.
I have learned that checkpoint can store RDD in a local or HDFS directory, and it will truncate the lineage of RDD. But how can I get the right checkpoint file in another driver program? Can Spark get the path automaticity?
For example, I checkpointed an RDD in the first driver program, and want to reuse it in the second driver program, but the second driver program didn't know the path of the checkpoint file, is it possible to reuse the checkpoint file?
I wrote a demo about checkpoint as bellow. I checkpoint the "sum" RDD, and collect it after.
val ds = spark.read.option("delimiter", ",").csv("/Users/lulijun/git/spark_study/src/main/resources/sparktest.csv")
.toDF("dt", "org", "pay", "per", "ord", "origin")
val filtered = ds.filter($"dt" > "20171026")
val groupby = filtered.groupBy("dt")
val sum = groupby.agg(("ord", "sum"), ("pay", "max"))
sum.count()
sum.checkpoint()
sum.collect()
But I found in the Spark Job triggered by action "collect", RDD nerver read checkpoint. Is it because the "sum" RDD already exists in memory? I'm confused about the method "computeOrReadCheckpoint", when will it read checkpoint?
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
By the way, what's the main difference between RDD checkpoint and chekpointing in Spark Streaming?
Any help would be appreciated.
Thanks!