0
votes

Is it possible to take the output of a transformation (RDD/Dataframe) and feed it to two independent transformations without recalculating the first transformation and without caching the whole dataset?

Long version

Consider the case.

I have a very large dataset that doesn't fit in memory. Now I do some transformations on it which prepare the data to be worked on efficiently (grouping, filtering, sorting....):

DATASET --(TF1: transformation with group by, etc)--> DF1
DF1 --(TF2: more_transformations_some_columns)--> output
DF1 --(TF3: more_transformations_other_columns)--> output2

I was wondering if there is any way (or planned in dev) to tell Spark that, after TF1, it must reuse the same results (at partition level, without caching everything!) to serve both TF2 and TF3.

This can be conceptually imagined as a cache() at each partition, with automatic unpersist() when the partition was consumed by the further transformations.

I searched for a long time but couldn't find any way of doing it.

My attempt:

DF1 = spark.read()... .groupBy().agg()...
DF2 = DF1.select("col1").cache()  # col1 fits in mem
DF3 = DF1.select("col1", transformation(other_cols)).write()...  # Force evaluation of col1

Unfortunately, DF3 cannot guess it could to the caching of col1. So apparently it isn't possible to ask spark to only cache a few columns. That would already alleviate the problem.

Any ideas?

1

1 Answers

0
votes

I don't think it is possible to cache just some of the columns,
but will this solve your problem?

DF1 = spark.read()... .groupBy().agg()...
DF3 = DF1.select("col1", transformation(other_cols)).cache()
DF3.write()
DF2 = DF3.select("col1")