35
votes

I have a sample application working to read from csv files into a dataframe. The dataframe can be stored to a Hive table in parquet format using the method df.saveAsTable(tablename,mode).

The above code works fine, but I have so much data for each day that i want to dynamic partition the hive table based on the creationdate(column in the table).

is there any way to dynamic partition the dataframe and store it to hive warehouse. Want to refrain from Hard-coding the insert statement using hivesqlcontext.sql(insert into table partittioin by(date)....).

Question can be considered as an extension to :How to save DataFrame directly to Hive?

any help is much appreciated.

7

7 Answers

40
votes

I believe it works something like this:

df is a dataframe with year, month and other columns

df.write.partitionBy('year', 'month').saveAsTable(...)

or

df.write.partitionBy('year', 'month').insertInto(...)
39
votes

I was able to write to partitioned hive table using df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")

I had to enable the following properties to make it work.

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
8
votes

I also faced same thing but using following tricks I resolved.

  1. When we Do any table as partitioned then partitioned column become case sensitive.

  2. Partitioned column should be present in DataFrame with same name (case sensitive). Code:

    var dbName="your database name"
    var finaltable="your table name"
    
    // First check if table is available or not..
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
         //If table is not available then it will create for you..
         println("Table Not Present \n  Creating table " + finaltable)
         sparkSession.sql("use Database_Name")
         sparkSession.sql("SET hive.exec.dynamic.partition = true")
         sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
         sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
         sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID        string,EMP_Name          string,EMP_Address               string,EMP_Salary    bigint)  PARTITIONED BY (EMP_DEP STRING)")
         //Table is created now insert the DataFrame in append Mode
         df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
    }
    
3
votes

it can be configured on SparkSession in that way:

spark = SparkSession \
    .builder \
    ...
    .config("spark.hadoop.hive.exec.dynamic.partition", "true") \
    .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

or you can add them to .properties file

the spark.hadoop prefix is needed by Spark config (at least in 2.4) and here is how Spark sets this config:

  /**
   * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
   * configuration without the spark.hadoop. prefix.
   */
  def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
    SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
  }
2
votes

This is what works for me. I set these settings and then put the data in partitioned tables.

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", 
"nonstrict")
1
votes

This worked for me using python and spark 2.1.0.

Not sure if it's the best way to do this but it works...

# WRITE DATA INTO A HIVE TABLE
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

### CREATE HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'hive_df'
""")
spark.sql("""
INSERT INTO hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###

### CREATE NON HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION 'non_hive_df'
""")
spark.sql("""
INSERT INTO non_hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###

### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
spark.sql("""
INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")

spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite
0
votes

df1.write.mode("append").format('ORC').partitionBy("date").option('path', '/hdfs_path').saveAsTable("DB.Partition_tablename")

It will create the partition with "date" column values and will also write as Hive External Table in hive from spark DF.