Lets walk through an example. This will create an RDD with a Seq of Ints in one partition. The reason for one partition is simply to keep ordering for the rest of the example.
scala> val seq = Seq(1,2,3,4,5)
seq: Seq[Int] = List(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(seq, 1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:23
Now lets create two new RDDs which are mapped versions of the original:
scala> val firstMappedRDD = rdd.map { case i => println(s"firstMappedRDD calc for $i"); i * 2 }
firstMappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:25
scala> firstMappedRDD.toDebugString
res25: String =
(1) MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
scala> val secondMappedRDD = firstMappedRDD.map { case i => println(s"secondMappedRDD calc for $i"); i * 2 }
secondMappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at map at <console>:27
scala> secondMappedRDD.toDebugString
res26: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
We can see the lineages using toDebugString. I added printlns to each map step to make it clear when the map is called. Let's collect each RDD to see what happens:
scala> firstMappedRDD.collect()
firstMappedRDD calc for 1
firstMappedRDD calc for 2
firstMappedRDD calc for 3
firstMappedRDD calc for 4
firstMappedRDD calc for 5
res27: Array[Int] = Array(2, 4, 6, 8, 10)
scala> secondMappedRDD.collect()
firstMappedRDD calc for 1
secondMappedRDD calc for 2
firstMappedRDD calc for 2
secondMappedRDD calc for 4
firstMappedRDD calc for 3
secondMappedRDD calc for 6
firstMappedRDD calc for 4
secondMappedRDD calc for 8
firstMappedRDD calc for 5
secondMappedRDD calc for 10
res28: Array[Int] = Array(4, 8, 12, 16, 20)
As you would expect, the map for the first step is called once again when we call secondMappedRDD.collect(). So now let's cache the first mapped RDD.
scala> firstMappedRDD.cache()
res29: firstMappedRDD.type = MapPartitionsRDD[12] at map at <console>:25
scala> secondMappedRDD.toDebugString
res31: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
scala> firstMappedRDD.count()
firstMappedRDD calc for 1
firstMappedRDD calc for 2
firstMappedRDD calc for 3
firstMappedRDD calc for 4
firstMappedRDD calc for 5
res32: Long = 5
scala> secondMappedRDD.toDebugString
res33: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| CachedPartitions: 1; MemorySize: 120.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
The lineage of the second mapped RDD has the cached result of the first in it's lineage, after the result of the first map is in the cache. Now let's call collect:
scala> secondMappedRDD.collect
secondMappedRDD calc for 2
secondMappedRDD calc for 4
secondMappedRDD calc for 6
secondMappedRDD calc for 8
secondMappedRDD calc for 10
res34: Array[Int] = Array(4, 8, 12, 16, 20)
And now let's unpersist and call collect again.
scala> firstMappedRDD.unpersist()
res36: firstMappedRDD.type = MapPartitionsRDD[12] at map at <console>:25
scala> secondMappedRDD.toDebugString
res37: String =
(1) MapPartitionsRDD[13] at map at <console>:27 []
| MapPartitionsRDD[12] at map at <console>:25 []
| ParallelCollectionRDD[11] at parallelize at <console>:23 []
scala> secondMappedRDD.collect
firstMappedRDD calc for 1
secondMappedRDD calc for 2
firstMappedRDD calc for 2
secondMappedRDD calc for 4
firstMappedRDD calc for 3
secondMappedRDD calc for 6
firstMappedRDD calc for 4
secondMappedRDD calc for 8
firstMappedRDD calc for 5
secondMappedRDD calc for 10
res38: Array[Int] = Array(4, 8, 12, 16, 20)
So when we collect the result of the second mapped RDD after the first has been unpersisted, the map of the first gets called again.
If the source had been HDFS, or any other storage, the data would have been retrieved from the source again.