You could leverage cluster by feature in spark-sql for table creation, table join, etc which is act as hive to avoid data exchange and sort in spark2.1+
see https://issues.apache.org/jira/browse/SPARK-15453
And currently hive can't recognize this feature since metadata is incompatible for it between spark and hive, that's why you can't use same syntax even recognizing this table at hive side, which will treat all columns as array
The following sample may give you some ideas:
prepare the source
val df = (0 until 80000).map(i => (i, i.toString, i.toString)).toDF("item_id", "country", "state").coalesce(1)
create two bucket tables from source
You would see "which is NOT compatible with Hive." by scroll to right
df.write.bucketBy(100, "country", "state").sortBy("country", "state").saveAsTable("kofeng.lstg_bucket_test")
17/03/13 15:12:01 WARN HiveExternalCatalog: Persisting bucketed data source table `kofeng`.`lstg_bucket_test` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
df.write.bucketBy(100, "country", "state").sortBy("country", "state").saveAsTable("kofeng.lstg_bucket_test2")
Join them and explain
disable broadcast joining first since the volume is small.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.sql.autoBroadcastJoinThreshold", "0").getOrCreate()
The plan avoid exchange and sort in SPARK 2.1.0, avoid exchange in SPARK2.0, only filter and scan prove the data locality utilization.
val query = """
|SELECT *
|FROM
| kofeng.lstg_bucket_test a
|JOIN
| kofeng.lstg_bucket_test2 b
|ON a.country=b.country AND
| a.state=b.state
""".stripMargin
val joinDF = sql(query)
scala> joinDF.queryExecution.executedPlan
res10: org.apache.spark.sql.execution.SparkPlan =
*SortMergeJoin [country#71, state#72], [country#74, state#75], Inner
:- *Project [item_id#70, country#71, state#72]
: +- *Filter (isnotnull(country#71) && isnotnull(state#72))
: +- *FileScan parquet kofeng.lstg_bucket_test[item_id#70,country#71,state#72] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ares-lvs-nn-ha/user/hive/warehouse/kofeng.db/lstg_bucket_test], PartitionFilters: [], PushedFilters: [IsNotNull(country), IsNotNull(state)], ReadSchema: struct<item_id:int,country:int,state:string>
+- *Project [item_id#73, country#74, state#75]
+- *Filter (isnotnull(country#74) && isnotnull(state#75))
+- *FileScan parquet kofeng.lstg_bucket_test2[item_id#73,country#74,state#75] Batched: true, Format: Parquet...