2
votes

I have two hive clustered tables t1 and t2

CREATE EXTERNAL TABLE `t1`(
  `t1_req_id` string,
   ...
PARTITIONED BY (`t1_stats_date` string)
CLUSTERED BY (t1_req_id) INTO 1000 BUCKETS
// t2 looks similar with same amount of buckets

The insert part happens in hive

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table `t1` partition(t1_stats_date,t1_stats_hour)
   select *
   from t1_raw
   where t1_stats_date='2020-05-10' and t1_stats_hour='12' AND 
   t1_req_id is not null

The code looks like as following:

 val t1 = spark.table("t1").as[T1]
 val t2=  spark.table("t2").as[T2]
 val outDS = t1.joinWith(t2, t1("t1_req_id) === t2("t2_req_id), "fullouter")
  .map { case (t1Obj, t2Obj) =>
    val t3:T3 = // do some logic
    t3 
  }
 outDS.toDF.write....

I see projection in DAG - but it seems that the job still does full data shuffle Also, while looking into the logs of executor I don't see it reads the same bucket of the two tables in one chunk - that what I would expect to find

There are spark.sql.sources.bucketing.enabled, spark.sessionState.conf.bucketingEnabled and spark.sql.join.preferSortMergeJoin flags

What am I missing? and why is there still full shuffle, if there are bucketed tables? The current spark version is 2.3.1

enter image description here enter image description here

1
I dont think that Spark will leverage clustered data with a fullouter join. Here you are asking Spark to read all the data in order to execute the outer join therefore Spark knows it should read the entire datasets and there are no filters to apply - abiratsis
The same picture was for me with left join. I have no clue why is that doesn't work, and what additional configurations I need to look. To populate data I've run it from hive (insert overwrite table ... select * ...) - Julias
ok I see, sorry I didnt read your example well. So one possible reason for bucketing not being leveraged could be that Spark does not retain the metadata of the columns being used during bucketing. Maybe you could try with dataframes instead of datasets? - abiratsis
data frame has the same result. It feels that hive metastore information isn't used. But if i run spark.sql("show extended t1").show(numRows=100, truncate=false) I doo see bucketed definitions - Julias

1 Answers

0
votes

One possibility here to check for is if you have a type mismatch. E.g. if the type of the join column is string in T1 and BIGINT in T2. Even if the types are both integer (e.g. one is INT, another BIGINT) Spark will still add shuffle here because different types use different hash functions for bucketing.