I know that when there are several stages in Spark processing the intermediate stage rdd is stored until the job finishes but reading about the use of cache() and persist() I feel like they are doing the same as well(apart from the MEMORY_AND_DISK, MEMORY_ONLY and MEMORY_ONLY_SER options which can be used). Can someone tell why we explicitly use cache() and persist() while having intermediate rdd and can you give some use cases for both ?.
1 Answers
Spark does not stores and RDD by default. Instead, it employs Lazy Evaluation mechanism. Simply put, spark maintains RDD lineage, creates DAG, and no transformation is executed until an action is encountered. You can read this article for more details.
As for difference between cache() and persist(), from this article:
Both caching and persisting are used to save the Spark RDD, Dataframe and Dataset’s. But, the difference is, RDD cache() method default saves it to memory (MEMORY_ONLY) whereas persist() method is used to store it to user-defined storage level.
cache() and persist() are the same thing, the only difference is that cache() storage level is memory only, and persist() can have both memory and disk storage levels. So cache() is the same as calling persist() with the default storage level.
Where to use cache() and where to use persist(), it depends on the scenario. This article will help you where to use cache, simply cache when:
- RDD re-use in iterative machine learning applications
- RDD re-use in standalone Spark applications
- When RDD computation is expensive, caching can help in reducing the cost of recovery in the case one or more executors fails
As as example, you can look at the pagerank example.
...
lines = sc.textFile(sys.argv[1], 1)
# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda (url, neighbors): (url, 1.0))
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in xrange(int(sys.argv[2])):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(
lambda (url, (urls, rank)): computeContribs(urls, rank))
# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
...
In this example, the links are repeatedly used in the for loop, that is why it is cached to avoid recomputation.
I Hope this answer helps you out.