5
votes

As per the Spark doc https://spark.apache.org/docs/2.1.0/sql-programming-guide.html#supported-hive-features the hive statement CLUSTER BY is supported. But when i tried to create a table using the following query from beeline

CREATE TABLE set_bucketing_test (key INT, value STRING) CLUSTERED BY (key) INTO 10 BUCKETS;

i get the following error

Error: org.apache.spark.sql.catalyst.parser.ParseException:
Operation not allowed: CREATE TABLE ... CLUSTERED BY(line 1, pos 0)

Not sure what mistake i'm doing. Any help?

1
Then what should be the query to create a CLUSTERED table? - Arun Kumar Sundaramurthy

1 Answers

0
votes

You could leverage cluster by feature in spark-sql for table creation, table join, etc which is act as hive to avoid data exchange and sort in spark2.1+

see https://issues.apache.org/jira/browse/SPARK-15453

And currently hive can't recognize this feature since metadata is incompatible for it between spark and hive, that's why you can't use same syntax even recognizing this table at hive side, which will treat all columns as array

The following sample may give you some ideas:

prepare the source

val df = (0 until 80000).map(i => (i, i.toString, i.toString)).toDF("item_id", "country", "state").coalesce(1)

create two bucket tables from source

You would see "which is NOT compatible with Hive." by scroll to right

df.write.bucketBy(100, "country", "state").sortBy("country", "state").saveAsTable("kofeng.lstg_bucket_test")

17/03/13 15:12:01 WARN HiveExternalCatalog: Persisting bucketed data source table `kofeng`.`lstg_bucket_test` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.

df.write.bucketBy(100, "country", "state").sortBy("country", "state").saveAsTable("kofeng.lstg_bucket_test2")

Join them and explain

disable broadcast joining first since the volume is small.

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.sql.autoBroadcastJoinThreshold", "0").getOrCreate()

The plan avoid exchange and sort in SPARK 2.1.0, avoid exchange in SPARK2.0, only filter and scan prove the data locality utilization.

 val query = """
 |SELECT *
 |FROM
 |  kofeng.lstg_bucket_test a
 |JOIN
 |  kofeng.lstg_bucket_test2 b
 |ON a.country=b.country AND
 |   a.state=b.state
      """.stripMargin
val joinDF = sql(query)


    scala> joinDF.queryExecution.executedPlan
    res10: org.apache.spark.sql.execution.SparkPlan =
*SortMergeJoin [country#71, state#72], [country#74, state#75], Inner
:- *Project [item_id#70, country#71, state#72]
:  +- *Filter (isnotnull(country#71) && isnotnull(state#72))
:     +- *FileScan parquet kofeng.lstg_bucket_test[item_id#70,country#71,state#72] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ares-lvs-nn-ha/user/hive/warehouse/kofeng.db/lstg_bucket_test], PartitionFilters: [], PushedFilters: [IsNotNull(country), IsNotNull(state)], ReadSchema: struct<item_id:int,country:int,state:string>
+- *Project [item_id#73, country#74, state#75]
   +- *Filter (isnotnull(country#74) && isnotnull(state#75))
      +- *FileScan parquet kofeng.lstg_bucket_test2[item_id#73,country#74,state#75] Batched: true, Format: Parquet...