1
votes

I am trying to understand Spark (2.4) Physical Plans. We interact with Spark via the SQL API.

I am using the following sql. The sql has one aggregation in Step 1 and a join operation in the next step. My intention was to repartition the source table before the aggregation step so that I could re-use this Exchange and avoid Shuffles (Exchanges) in the following steps (before the SM-Join step), but it did not work as per my expectation since Spark added Exchanges before the SMJ. Could you please help me understand where I am going wrong.

    create or replace temporary view prsn_dtl as
    select
    policy_num,
    prsn_id,
    eff_dt,
    from db.person_details
    cluster by policy_num;
    
    create or replace temporary view plcy_dtl as
    select
    policy_num,
    role_desc,
    prsn_actv_flg
    from plcy_detail;
    
    create or replace temporary view my_keys as
    select
    policy_num,
    prsn_id,
    max(eff_dt) eff_dt
    from prsn_dtl
    group by 1, 2;
    
    select
    keys.policy_num,
    keys.prsn_id,
    keys.eff_dt,
    plcy.role_desc,
    plcy.prsn_actv_flg
    from my_keys keys
    inner join plcy_dtl plcy
    on keys.policy_num = plcy.policy_num;

In the DAG representation I found 3 Exchanges (2 in the left branch and 1 in the right branch)-
Step 1) First one hashpartitioning(policy_num#92, 200) due to the manual cluster by before the aggregate
Step 2) Second was between the Aggregate operators on hashpartitioning(policy_num#163, prsn_id#164, 200)
Step 3) Finally hashpartitioning(policy_num#163) before the sort-merge Join

My question :
why did not the Exchange (from cluster by) from Step 1 above, get propagated downstream and was not re-used in Step 3 before the Sort-Merge Join.

My expectation was that Spark will reuse the Exchange from Step 1 (cluster by) and will not add another Exchange (before the SMJ) in Step 3, since the left-branch was repartitioned on policy_num early in the query.

Can anyone please explain where I am going wrong. Any help is appreciated.

Note: I am using Spark 2.4

Thanks

1

1 Answers

0
votes

The issue appears to be alleviated by not renaming columns in the queries and propagating the same columns to downstream queries.