I've done a couple of experiments as shown below. Apparently, the dataframe, once cached, remains cached (as shown in getPersistentRDDs
and the query plan - InMemory
etc.), even if all Python reference were overwritten or deleted altogether using del
, and with garbage collection explicitly called.
Experiment 1:
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df2 = df.filter('col1 != 2')
del df
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
df2.select('*').explain()
del df2
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
Results:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o234}
>>> df2 = df.filter('col1 != 2')
>>> del df
>>> import gc
>>> gc.collect()
93
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o240}
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]
+- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#172L AS col1#174L]
+- *(1) Scan ExistingRDD[_1#172L]
>>> del df2
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o250}
Experiment 2:
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df = df.filter('col1 != 2')
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
df.select('*').explain()
del df
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
Results:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o317}
>>> df = df.filter('col1 != 2')
>>> import gc
>>> gc.collect()
244
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o323}
>>> df.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]
+- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#218L AS col1#220L]
+- *(1) Scan ExistingRDD[_1#218L]
>>> del df
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o333}
Experiment 3 (control experiment, to show that unpersist
works)
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df2 = df.filter('col1 != 2')
df2.select('*').explain()
df.unpersist()
df2.select('*').explain()
Results:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{116: JavaObject id=o398}
>>> df2 = df.filter('col1 != 2')
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]
+- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#310L AS col1#312L]
+- *(1) Scan ExistingRDD[_1#310L]
>>> df.unpersist()
DataFrame[col1: bigint]
>>> sc._jsc.getPersistentRDDs()
{}
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Project [_1#310L AS col1#312L]
+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))
+- *(1) Scan ExistingRDD[_1#310L]
To answer the OP's question:
Does that mean that the cached data frame is no longer available and will be garbage collected? Does that mean that the new post-filter df will compute everything from scratch, despite being generated from a previously cached data frame?
The experiments suggest no for both. The dataframe remains cached, is not garbage collected, and the new dataframe is computed using the cached (unreference-able) dataframe, according to the query plan.
Some helpful functions related to cache usage (if you don't want to do it through the Spark UI) are:
sc._jsc.getPersistentRDDs()
, which shows a list of cached RDDs/dataframes, and
spark.catalog.clearCache()
, which clears all cached RDDs/dataframes.
Am I deviating from best practice in doing the above?
I am no authority to judge you on this, but as one of the comments suggested, avoid reassigning to df
because dataframes are immutable. Try to imagine you're coding in scala and you defined df
as a val
. Doing df = df.filter(...)
is impossible. Python can't enforce that per se, but I think the best practice is to avoid overwriting any dataframe variables, so that you can always call df.unpersist()
afterwards if you no longer need the cached results anymore.