72
votes

What is the difference between spark checkpoint and persist to a disk. Are both these store in the local disk?

4
It is a very generic question. Better would be to add some context around it. To answer your question it can be stored to any persistent Storage Area - Local DIsk or HDFS or NFS Mounted space etc.Sumit
@Sumit - This is a very specific question about the differences between two Spark RDD methods. The answer can be objective and focused, as zero323's answer below demonstrates.Nick Chammas

4 Answers

81
votes

There are few important differences but the fundamental one is what happens with lineage. Persist / cache keeps lineage intact while checkpoint breaks lineage. Lets consider following examples:

import org.apache.spark.storage.StorageLevel

val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
  • cache / persist:

    val indCache  = rdd.mapValues(_ > 4)
    indCache.persist(StorageLevel.DISK_ONLY)
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
    indCache.count
    // 3
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |       CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
  • checkpoint:

    val indChk  = rdd.mapValues(_ > 4)
    indChk.checkpoint
    
    indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 []
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 []
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 []
    
    indChk.count
    // 3
    
    indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ReliableCheckpointRDD[12] at count at <console>:27 []
    

As you can see, in the first case lineage is preserved even if data is fetched from the cache. It means that data can be recomputed from scratch if some partitions of indCache are lost. In the second case lineage is completely lost after the checkpoint and indChk doesn't carry an information required to rebuild it anymore.

checkpoint, unlike cache / persist is computed separately from other jobs. That's why RDD marked for checkpointing should be cached:

It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

Finally checkpointed data is persistent and not removed after SparkContext is destroyed.

Regarding data storage SparkContext.setCheckpointDir used by RDD.checkpoint requires DFS path if running in non-local mode. Otherwise it can be local files system as well. localCheckpoint and persist without replication should use local file system.

Important Note:

RDD checkpointing is a different concept than a chekpointing in Spark Streaming. The former one is designed to address lineage issue, the latter one is all about streaming reliability and failure recovery.

45
votes

I think you can find a very detailed answer here

While it is very hard to summarize all in that page, I will say

Persist

  • Persisting or caching with StorageLevel.DISK_ONLY cause the generation of RDD to be computed and stored in a location such that subsequent use of that RDD will not go beyond that points in recomputing the linage.
  • After persist is called, Spark still remembers the lineage of the RDD even though it doesn't call it.
  • Secondly, after the application terminates, the cache is cleared or file destroyed

Checkpointing

  • Checkpointing stores the rdd physically to hdfs and destroys the lineage that created it.
  • The checkpoint file won't be deleted even after the Spark application terminated.
  • Checkpoint files can be used in subsequent job run or driver program
  • Checkpointing an RDD causes double computation because the operation will first call a cache before doing the actual job of computing and writing to the checkpoint directory.

You may want to read the article for more of the details or internals of Spark's checkpointing or Cache operations.

7
votes
  1. Persist(MEMORY_AND_DISK) will store the data frame to disk and memory temporary without breaking the lineage of the program i.e. df.rdd.toDebugString() would return the same output. It is recommended to use persist(*) on a calculation, that is going to be reused to avoid recalculation of intermediate results:

    df = df.persist(StorageLevel.MEMORY_AND_DISK)
    calculation1(df)
    calculation2(df)
    

    Note, that caching the data frame does not guarantee, that it will remain in memory until you call it next time. Depending on the memory usage the cache can be discarded.

  2. checkpoint(), on the other hand, breaks lineage and forces data frame to be stored on disk. Unlike usage of cache()/persist(), frequent check-pointing can slow down your program. Checkpoints are recommended to use when a) working in an unstable environment to allow fast recovery from failures b) storing intermediate states of calculation when new entries of the RDD are dependent on the previous entries i.e. to avoid recalculating a long dependency chain in case of failure

1
votes

If you check the relevant part of the documentation, it talks about writing data to a reliable system, e.g. HDFS. But it is up to you to tell Apache Spark where to write its checkpoint information.

On the other hand, persisting is about caching data mostly in memory, as this part of the documentation clearly indicates.

So, it depends on what directory you gave to Apache Spark.