0
votes

I am using spark 2.2.1 and hive2.1. I am trying to insert overwrite multiple partitions into existing partitioned hive/parquet table.

Table was created using sparkSession.

I have a table 'mytable' with partitions P1 and P2.

I have following set on sparkSession object:

"hive.exec.dynamic.partition"=true
"hive.exec.dynamic.partition.mode"="nonstrict"

Code:

val df = spark.read.csv(pathToNewData) df.createOrReplaceTempView("updateTable") //here 'df' may contains data from multiple partitions. i.e. multiple values for P1 and P2 in data.

spark.sql("insert overwrite table mytable PARTITION(P1, P2) select c1, c2,..cn, P1, P2 from updateTable") // I made sure that partition columns P1 and P2 are at the end of projection list.

I am getting following error:

org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSemanticException: Partition spec {p1=, p2=, P1=1085, P2=164590861} contains non-partition columns;

dataframe 'df' have records for P1=1085, P2=164590861 . It looks like issue with casing (lower vs upper). I tried both cases in my query but it's still not working.

EDIT:

Insert statement works with static partitioning but that is not what I am looking for: e.g. following works

spark.sql("insert overwrite table mytable PARTITION(P1=1085, P2=164590861) select c1, c2,..cn, P1, P2 from updateTable where P1=1085 and P2=164590861")

Create table stmt:

`CREATE TABLE `my_table`(
  `c1` int, 
  `c2` int, 
  `c3` string, 
  `p1` int, 
  `p2` int)
PARTITIONED BY ( 
  `p1` int, 
  `p2` int)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'maprfs:/mds/hive/warehouse/my.db/xc_bonus'
TBLPROPERTIES (
  'spark.sql.partitionProvider'='catalog', 
  'spark.sql.sources.schema.numPartCols'='2', 
  'spark.sql.sources.schema.numParts'='1', 
  'spark.sql.sources.schema.part.0'='{.spark struct metadata here.......}'; 
  'spark.sql.sources.schema.partCol.0'='P1', //Spark is using Capital Names for Partitions; while hive is using lowercase
  'spark.sql.sources.schema.partCol.1'='P2', 
  'transient_lastDdlTime'='1533665272')`

In above, spark.sql.sources.schema.partCol.0 uses all uppercase while PARTITIONED BY statement uses all lowercase for partitions columns

2
Can you share the 'Create table..." statement for 'mytable'? Was it created as a partitioned table? - venBigData
@venBigData added create table stmt it in description - nir

2 Answers

0
votes

Based on the Exception and also assuming that the table 'mytable' was created as a partitioned table with P1 and P2 as partitions. One way to overcome this exception would be to force a dummy partition manually before executing the command. Try doing

spark.sql("alter table mytable add partition (p1=default, p2=default)").

Once successful, execute your insert overwrite statement. Hope this helps?

0
votes

As I mentioned in EDIT section issue was in fact with difference in partition columns casing (lower vs upper) between hive and spark! I created hive table with all Upper cases but hive still internally stored it as lowercases but spark metadata kept is as Upper cases as intended by me. Fixing create statement with all lower case partition columns fixed the issue with subsequent updates! If you are using hive 2.1 and spark 2.2 make sure following properties in create statement have same casing.

PARTITIONED BY ( 
p1int, 
p2int)
'spark.sql.sources.schema.partCol.0'='p1', 
  'spark.sql.sources.schema.partCol.1'='p2',