I am trying to understand Spark (2.4) Physical Plans. We interact with Spark via the SQL API.
I am using the following sql. The sql has one aggregation in Step 1 and a join operation in the next step. My intention was to repartition the source table before the aggregation step so that I could re-use this Exchange and avoid Shuffles (Exchanges) in the following steps (before the SM-Join step), but it did not work as per my expectation since Spark added Exchanges before the SMJ. Could you please help me understand where I am going wrong.
create or replace temporary view prsn_dtl as
select
policy_num,
prsn_id,
eff_dt,
from db.person_details
cluster by policy_num;
create or replace temporary view plcy_dtl as
select
policy_num,
role_desc,
prsn_actv_flg
from plcy_detail;
create or replace temporary view my_keys as
select
policy_num,
prsn_id,
max(eff_dt) eff_dt
from prsn_dtl
group by 1, 2;
select
keys.policy_num,
keys.prsn_id,
keys.eff_dt,
plcy.role_desc,
plcy.prsn_actv_flg
from my_keys keys
inner join plcy_dtl plcy
on keys.policy_num = plcy.policy_num;
In the DAG representation I found 3 Exchanges (2 in the left branch and 1 in the right branch)-
Step 1) First one hashpartitioning(policy_num#92, 200) due to the manual cluster by before the aggregate
Step 2) Second was between the Aggregate operators on hashpartitioning(policy_num#163, prsn_id#164, 200)
Step 3) Finally hashpartitioning(policy_num#163) before the sort-merge Join
My question :
why did not the Exchange (from cluster by) from Step 1 above, get propagated downstream and was not re-used in Step 3 before the Sort-Merge Join.
My expectation was that Spark will reuse the Exchange from Step 1 (cluster by) and will not add another Exchange (before the SMJ) in Step 3, since the left-branch was repartitioned on policy_num early in the query.
Can anyone please explain where I am going wrong. Any help is appreciated.
Note: I am using Spark 2.4
Thanks