0
votes

I have a dataframe which I am using to insert into an existing partitioned hive table using spark sql (using dynamic partitioning). Once dataframe has been written, I would like to know what are the partitions my dataframe has just created in hive.

I could query the dataframe for distinct partitions but it takes very long time as it has to start the entire lineage of the dataframe.

I could persist the dataframe before writing to hive, so that, write operation and disctinct partition_column operation happens on top of cached dataframe. But my dataframe is extremely large and dont want to be spending more time in persisting.

I know all the partition information is stored in Hive Metastore. Are there any metastore apis in spark that could help retrieve only the new partitions that were created?

2
which column you have partitioned the data? check below.. may help stackoverflow.com/questions/36095790/… - vikrant rana
one of the date column in the dataframe. - rasberry

2 Answers

0
votes

You can use HiveMetastoreClient to retrieve partition data for a table:

import org.apache.hadoop.hive.conf.HiveConf
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient

val hiveConf = new HiveConf(spark.sparkContext.hadoopConfiguration, classOf[HiveConf])
val cli = new HiveMetaStoreClient(hiveConf)

/* Get list of partition values prior to DF insert */
val existingPartitions = cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(_.getValues.asScala.mkString(","))
/* Insert DF contents to table */
df.write.insertInto("<db_name>.<tbl_name>")
/* Fetch list of partition values again, and diff with previous list */
val newPartitions = cli.listPartitions("<db_name>", "<tbl_name>", Short.MaxValue).asScala.map(_.getValues.asScala.mkString(","))
val deltaPartitions = newPartitions.diff(existingPartitions)
0
votes
val epochTime = <epoch time before inserting the dataframe>
val partitionName = <Partition Column Name>
df.write.insertInto("<db_name>.<tbl_name>")
val catalogPartitions = spark.sharedState.externalCatalog.listPartitions("<db_name>", "<tbl_name>")
val partitionValues = catalogPartitions.filter(cp => ((cp.parameters.get("transient_lastDdlTime").isDefined && cp.parameters.getOrElse("transient_lastDdlTime", "0").toLong >= epochTime / 1000) || cp.lastAccessTime >= epochTime || cp.createTime >= epochTime) && cp.spec.contains(datePartition)).map(cp => cp.spec.getOrElse(datePartition, "")
      }).toList
processedPartitions = partitionValues.toList   

lastAccessTime is coming as 0 in most of the cases. createTime has the time when the paratition is created. But in parameters I found a new param transient_lastDdlTime that contains the updated timestamp of the partition. On a safer side, checking on all three to give me the partitions that were created or modified after the given epoch time.