3
votes

I have an RDD which is formed by reading a local text file of size of roughly 117MB.

scala> rdd
res87: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:24

I cache the RDD:-

scala> rdd.persist()
res84: rdd.type = MapPartitionsRDD[3] at textFile at <console>:24

After this I call the 'take(1)' action on the RDD to force evalulation. Once this is done, I check the Spark UI's Storage page. It shows me fraction cached is 2% only, size in memory being 6.5MB. Then I call 'count' action on the RDD. After this when I check the Spark UI Storage page I suddenly see that those numbers have now changed. The fraction cached is 82% and size in memory is 258.2MB. Does it mean that even after caching an RDD, Spark only really caches what is really required for the subsequent action (since take(1) only reads one top element)? And when the second action 'count' was triggered, it needed to touch all the elements, so it ended up caching the remaining part as well? I have not come across any documented behavior like this, is it a bug?

2

2 Answers

1
votes

Based on the source code, you are right. RDD reference is only saved to HashMap of persisted RDDs and is registered with special cleaner when persist() is called. So caching is performed during real reading of the data. More over it's can be displaced (for example when not enough memory and active reference to data doesn't exist).

1
votes

Spark only materializes rdds on demand, that is in response to an action as mentioned in the previous answer. Most actions require to read all partitions of the rdd such us count() but other actions do not require to materialize all partitions and for performance reasons they don't. take(x) and first(), which is essentially take(1), are examples of such actions. Imagine a case where you have an rdd with millions of records and many partitions and you only need to examine a few records through take(x). Materializing the entire rdd would be wasteful. Instead Spark materializes a single partition and examines the number of items it contains. Based on that number it materializes more partitions to cover the needs of take(x) (I am simplifying the logic of take(x) here).

In your case take(1) requires a single partition so only 1 partition is materialized and cached. Then when you do a count() all partitions need to be materialized and cached to the extend your available memory allows.