Recently I saw some strange behaviour of Spark.
I have a pipeline in my application in which I'm manipulating one big Dataset - pseudocode:
val data = spark.read (...)
data.join(df1, "key") //etc, more transformations
data.cache(); // used to not recalculate data after save
data.write.parquet() // some save
val extension = data.join (..) // more transformations - joins, selects, etc.
extension.cache(); // again, cache to not double calculations
extension.count();
// (1)
extension.write.csv() // some other save
extension.groupBy("key").agg(some aggregations) //
extension.write.parquet() // other save, without cache it will trigger recomputation of whole dataset
However when I call data.unpersist()
i.e. in place (1)
, Spark deletes from Storage all datasets, also the extension
Dataset which is not the dataset I tried to unpersist.
Is that an expected behaviour? How can I free some memory by unpersist
on old Dataset without unpersisting all Dataset that was "next in chain"?
My setup:
- Spark version: current master, RC for 2.3
- Scala: 2.11
- Java: OpenJDK 1.8
Question looks similar to Understanding Spark's caching, but here I'm doing some actions before unpersist. At first I'm counting everything and then save into storage - I don't know if caching works the same in RDD like in Datasets