I have two dataframes df1
and df2
and I want to join these tables many times on a high cardinality field called visitor_id
. I would like to perform only one initial shuffle and have all the joins take place without shuffling/exchanging data between spark executors.
To do so, I have created another column called visitor_partition
that consistently assigns each visitor_id a random value between [0, 1000)
. I have used a custom partitioner to ensure that the df1
and df2
are exactly partitioned such that each partition contains exclusively rows from one value of visitor_partition
. This initial repartition is the only time I want to shuffle the data.
I have saved each dataframe to parquet in s3, paritioning by visitor partition -- for each data frame, this creates 1000 files organized in df1/visitor_partition=0
, df1/visitor_partition=1
...df1/visitor_partition=999
.
Now I load each dataframe from the parquet and register them as tempviews via df1.createOrReplaceTempView('df1')
(and the same thing for df2) and then run the following query
SELECT
...
FROM
df1 FULL JOIN df1 ON
df1.visitor_partition = df2.visitor_partition AND
df1.visitor_id = df2.visitor_id
In theory, the query execution planner should realize that no shuffling is necessary here. E.g., a single executor could load in data from df1/visitor_partition=1
and df2/visitor_partition=2
and join the rows in there. However, in practice spark 2.4.4's query planner performs a full data shuffle here.
Is there some way I can prevent this shuffle from taking place?