13
votes

Recently I saw some strange behaviour of Spark.

I have a pipeline in my application in which I'm manipulating one big Dataset - pseudocode:

val data = spark.read (...)
data.join(df1, "key") //etc, more transformations
data.cache(); // used to not recalculate data after save
data.write.parquet() // some save

val extension = data.join (..) // more transformations - joins, selects, etc.
extension.cache(); // again, cache to not double calculations
extension.count();
// (1)
extension.write.csv() // some other save

extension.groupBy("key").agg(some aggregations) //
extension.write.parquet() // other save, without cache it will trigger recomputation of whole dataset

However when I call data.unpersist() i.e. in place (1), Spark deletes from Storage all datasets, also the extension Dataset which is not the dataset I tried to unpersist.

Is that an expected behaviour? How can I free some memory by unpersist on old Dataset without unpersisting all Dataset that was "next in chain"?

My setup:

  • Spark version: current master, RC for 2.3
  • Scala: 2.11
  • Java: OpenJDK 1.8

Question looks similar to Understanding Spark's caching, but here I'm doing some actions before unpersist. At first I'm counting everything and then save into storage - I don't know if caching works the same in RDD like in Datasets

2
@raam86 I'm checking Storage tab in Spark UI. Also, I see that Datasets are recomputedT. Gawęda

2 Answers

12
votes

This is an expected behavior from spark caching. Spark doesn't want to keep invalid cache data. It completely removes all the cached plans refer to the dataset.

This is to make sure the query is correct. In the example you are creating extension dataset from cached dataset data. Now if the dataset data is unpersisted essentially extension dataset can no longer rely on the cached dataset data.

Here is the Pull request for the fix they made. You can see similar JIRA ticket

5
votes

Answer for Spark 2.4:

There was a ticket about correctness in Datasets and caching behaviour, see https://issues.apache.org/jira/browse/SPARK-24596

From Maryann Xue description, now caching will work in following manner:

  1. Drop tables and regular (persistent) views: regular mode
  2. Drop temporary views: non-cascading mode
  3. Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
  4. Call DataSet.unpersist(): non-cascading mode
  5. Call Catalog.uncacheTable(): follow the same convention as drop tables/view, which is, use non-cascading mode for temporary views and regular mode for the rest

Where "regular mode" means mdoe from the questions and @Avishek's answer and non-cascading mode means, that extension won't be unpersisted