0
votes

I know there are already lots of answers on writing to HIVE from Spark, but non of them seem to work for me. So first some background. This is an older cluster, running HDP2.6, that's Hive2 and Spark 2.1.

Here an example program:

case class Record(key: Int, value: String)

val spark = SparkSession.builder()
    .appName("Test App")
    .config("spark.sql.warehouse.dir", "/app/hive/warehouse")
    .enableHiveSupport()
    .getOrCreate()

val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
records.write.saveAsTable("records_table")

If I log into the spark-shell and run that code, a new table called records_table shows up in Hive. However, if I deploy that code in a jar, and submit it to the cluster using spark-submit, I will see the table show up in the same HDFS location as all of the other HIVE tables, but it's not accessible to HIVE.

I know that in HDP 3.1 you have to use a HiveWarehouseConnector class, but I can't find any reference to that in HDP 2.6. Some people have mentioned the HiveContext class, while others say to just use the enableHiveSupport call in the SparkSessionBuilder. I have tried both approaches, but neither seems to work. I have tried saveAsTable. I have tried insertInto. I have even tried creating a temp view, then hiveContext.sql("create table if not exists mytable as select * from tmptable"). With each attempt, I get a parquet file in hdfs:/apps/hive/warehouse, but I cannot access that table from HIVE itself.

1

1 Answers

2
votes

Based on the information provided, here is what I suggest you do,

  1. Create Spark Session, enableHiveSupport is required,
val spark = SparkSession.builder()
    .appName("Test App")
    .enableHiveSupport() 
    .getOrCreate()
  1. Next, execute DDL for table resultant table via spark.sql,
val ddlStr: String =
    s"""CREATE EXTERNAL TABLE IF NOT EXISTS records_table(key int, value string)
                 |ROW FORMAT SERDE
                 |  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
                 |STORED AS INPUTFORMAT
                 |  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
                 |OUTPUTFORMAT
                 |  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
                 |LOCATION '$hdfsLocation'""".stripMargin

spark.sql(ddlStr)
  1. Write data as per your use case,
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.write.format("orc").insertInto("records_table")

Notes:

  1. Working is going to be similar for spark-shell and spark-submit
  2. Partitioning is can be defined in the DDL, so do no use partitionBy while writing the data frame.
  3. Bucketing/ Clustering is not supported.

Hope this helps/ Cheers.