0
votes

I am trying to save a dataframe as an external table which will be queried both with spark and possibly with hive, but somehow, I cannot query or see any data with hive. It works on in spark.

Here is how to reproduce the problem:

scala> println(spark.conf.get("spark.sql.catalogImplementation"))
hive
scala> spark.conf.set("hive.exec.dynamic.partition", "true")
scala> spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
scala> spark.conf.set("spark.sql.sources.bucketing.enabled", true)
scala> spark.conf.set("hive.exec.dynamic.partition", "true")
scala> spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
scala> spark.conf.set("hive.enforce.bucketing","true")
scala> spark.conf.set("optimize.sort.dynamic.partitionining","true")
scala> spark.conf.set("hive.vectorized.execution.enabled","true")
scala> spark.conf.set("hive.enforce.sorting","true")
scala> spark.conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
scala> spark.conf.set("hive.metastore.uris", "thrift://localhost:9083")
scala> var df = spark.range(20).withColumn("random", round(rand()*90))
df: org.apache.spark.sql.DataFrame = [id: bigint, random: double]

scala> df.head
res19: org.apache.spark.sql.Row = [0,46.0]                                      
scala> df.repartition(10, col("random")).write.mode("overwrite").option("compression", "snappy").option("path", "s3a://company-bucket/dev/hive_confs/").format("orc").bucketBy(10, "random").sortBy("random").saveAsTable("hive_random")
19/08/01 19:26:55 WARN HiveExternalCatalog: Persisting bucketed data source table `default`.`hive_random` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. 

Here is how I query in hive:

Beeline version 2.3.4-amzn-2 by Apache Hive
0: jdbc:hive2://localhost:10000/default> select * from hive_random;
+------------------+
| hive_random.col  |
+------------------+
+------------------+
No rows selected (0.213 seconds)

But it works fine in spark:

scala> spark.sql("SELECT * FROM hive_random").show
+---+------+                                                                    
| id|random|
+---+------+
|  3|  13.0|
| 15|  13.0|
...
|  8|  46.0|
|  9|  65.0|
+---+------+
3
This column name is strange hive_random.col - gorros
Spark can query it because it just uses hive metastore, but Hive uses MapReduce , Tez or whatever under the hood. You can try to set Hive engine to Spark and test. - gorros
@gorros how do I set the hive engine? - Rafael Barros
I have added answer bellow. - gorros
I would suggest creating a table using DDL and running . df.repartition(10, col("random")).write.mode("overwrite").option("compression", "snappy").option("path", "s3a://company-bucket/dev/hive_confs/").format("orc").bucketBy(10, "random").sortBy("random") omitting save as table - VenkateswaraCh

3 Answers

1
votes

There is warning after your saveAsTable call. That's where the hint lies -

'Persisting bucketed data source table default.hive_random into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.'

The reason being 'saveAsTable' creates RDD partitions but not Hive partitions, the workaround is to create the table via hql before calling DataFrame.saveAsTable.

0
votes

I will suggest t try couple of thing. First, try to set hive execution engine to use Spark.

set hive.execution.engine=spark;

Second, try to create external table in metastore and then save data to that table.

0
votes

The Semantics of bucketed table in Spark and Hive is different.
The doc has details of the differences in semantics.
It states that

Data is written to bucketed tables but the output does not adhere with expected  
bucketing spec. This leads to incorrect results when one tries to consume the  
Spark written bucketed table from Hive.

Workaround: If reading from both engines is the requirement, writes need to happen from Hive