1
votes

We have a transactional table with ORC file format and ZLiB compression. It's an internal table. When reading through Hive CLI I can read the table. But when runnnig select through spark sql it shows table with all the columns and 0 rows. Please help.

2
try running invalidate metadata database.table_name and then your select in pyspark. - samkart
refer this on how to refresh - samkart
@samkart Not able to run INVALIDATE METADATA statements, probably because we dont have Impala. And refresh table also didn't work. Still an empty dataframe in Spark. - Manish

2 Answers

2
votes

This can also be achieved using Pyspark but you have to add a few configurations to it, below is the code sample to do the same in Pyspark

from pyspark.sql import  *
from pyspark_llap import HiveWarehouseSession
from pyspark.sql import SparkSession
from pyspark.sql.rdd.RDD
from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql.functions import *
from pyspark.sql.functions import *

spark = SparkSession.builder \
                   .appName("appname") \
                   .enableHiveSupport() \
                   .getOrCreate()

hive = HiveWarehouseSession.session(spark).build()
sample = hive.executeQuery("""select * from <schema.tablename>""")
sample.show()`

to execute the above using spark-submit add the configuration as below

spark-submit --jars /usr/hdp/3.1.0.0-78/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar --py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.1.0.0-78.zip --conf spark.sql.hive.hiveserver2.jdbc.url="jdbc URL;serviceDiscoveryMode=zooKeeperHA;zooKeeperNamespace=hs2ActivePassiveHA" --conf spark.hadoop.hive.llap.daemon.service.hosts="@llap0" --conf spark.datasource.hive.warehouse.load.staging.dir="/tmp" --conf spark.hadoop.hive.zookeeper.quorum="all zookeeper urls" --conf spark.sql.hive.hiveserver2.jdbc.url.principal="url for JDBC connection" --conf spark.security.credentials.hiveserver2.enabled="true" TestPysparkJob.py

1
votes

This can be done in scala spark using Hive Ware Connector(I am using Hortonworks). Jar used is "hive-warehouse-connector_2.11-1.0.0.3.1.0.0-78.jar".

Sample Code:

import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder
val conf = new SparkConf()
val spark = SparkSession.builder().appName(AppName).enableHiveSupport().getOrCreate()
spark.conf.set("spark.sql.hive.hiveserver2.jdbc.url","//your hive url")
val hive = HiveWarehouseBuilder.session(spark).build()
val res = hive.table("db.tablename")
res.show(20,False)