0
votes

I am having issues with saving my dataframe to a hive table using the following API code.

df.write.mode(SaveMode.Append).format("parquet").partitionBy("ord_deal_year", "ord_deal_month", "ord_deal_day").insertInto(tableName)

My Dataframe has around 48 Columns. Where the Hive table has 90 Columns. When I attempt to save the Dataframe I receive the following error:

12:56:11 Executor task launch worker-0 ERROR Executor:96  Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.ArrayIndexOutOfBoundsException: 51
    at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.genericGet(rows.scala:253)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getAs(rows.scala:34)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.isNullAt(rows.scala:35)
    at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.isNullAt(rows.scala:247)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:107)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
12:56:11 task-result-getter-3 WARN  TaskSetManager:71  Lost task 0.0 in stage 3.0 (TID 3, localhost): java.lang.ArrayIndexOutOfBoundsException: 51
    at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.genericGet(rows.scala:253)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getAs(rows.scala:34)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.isNullAt(rows.scala:35)
    at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.isNullAt(rows.scala:247)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:107)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

12:56:11 task-result-getter-3 ERROR TaskSetManager:75  Task 0 in stage 3.0 failed 1 times; aborting job

I have attempted to add the missing columns using the following snippet of code.

val columnsAdded = columns.foldLeft(df) { case (d, c) =>
  if (d.columns.contains(c._1)) {
    // column exists; skip it
    d
  } else {
    // column is not available so add it
    d.withColumn(c._1, lit(null).cast(c._2))
  }
}

But the same issue still remains.

I have checked the following question: Error while trying to save the data to Hive tables from Dataframe and the solution, which was determined to be an incorrect schema in the dataframe in comparison to the Hive table.

newDF.schema.map{i =>
     s"Column ${i.name},${i.dataType}"+
     s" Column exists in hive ${hiveSchema.get(i.name).isDefined}" +
     s" Hive Table has the correct datatype ${i.dataType == hiveSchema(i.name)}"
}.foreach(i => println(i))

Has anyone seen this issue or have any advice as to how to resolve this?

2

2 Answers

1
votes

I would explicitly select all the additional columns you require for padding out the missing properties.

Another thing to watch out for is that you need to get the columns in the correct order. Spark can write parquet files which fit the schema just fine, but it ignores the column name you used. so if hive has a: string, b: string and your spark code generates a DF with "b,a" in it, it will write just fine but the columns will be in the wrong order.

So, taking the two suggestions combined, I would add a guard clause, selecting the exact list of columns hive has in the metadata, in the exact order it expects - just before the write out / insertInto.

1
votes

When using insertInto, you don't have to use partitionBy. Those columns will be used for partitioning in Hive either way.

By the way, DataFrame provides a way to pretty print the scheam out of the box, with printSchema.