I am having issues with saving my dataframe to a hive table using the following API code.
df.write.mode(SaveMode.Append).format("parquet").partitionBy("ord_deal_year", "ord_deal_month", "ord_deal_day").insertInto(tableName)
My Dataframe has around 48 Columns. Where the Hive table has 90 Columns. When I attempt to save the Dataframe I receive the following error:
12:56:11 Executor task launch worker-0 ERROR Executor:96 Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.ArrayIndexOutOfBoundsException: 51
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.genericGet(rows.scala:253)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getAs(rows.scala:34)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.isNullAt(rows.scala:35)
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.isNullAt(rows.scala:247)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:107)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
12:56:11 task-result-getter-3 WARN TaskSetManager:71 Lost task 0.0 in stage 3.0 (TID 3, localhost): java.lang.ArrayIndexOutOfBoundsException: 51
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.genericGet(rows.scala:253)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getAs(rows.scala:34)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.isNullAt(rows.scala:35)
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.isNullAt(rows.scala:247)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:107)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
12:56:11 task-result-getter-3 ERROR TaskSetManager:75 Task 0 in stage 3.0 failed 1 times; aborting job
I have attempted to add the missing columns using the following snippet of code.
val columnsAdded = columns.foldLeft(df) { case (d, c) =>
if (d.columns.contains(c._1)) {
// column exists; skip it
d
} else {
// column is not available so add it
d.withColumn(c._1, lit(null).cast(c._2))
}
}
But the same issue still remains.
I have checked the following question: Error while trying to save the data to Hive tables from Dataframe and the solution, which was determined to be an incorrect schema in the dataframe in comparison to the Hive table.
newDF.schema.map{i =>
s"Column ${i.name},${i.dataType}"+
s" Column exists in hive ${hiveSchema.get(i.name).isDefined}" +
s" Hive Table has the correct datatype ${i.dataType == hiveSchema(i.name)}"
}.foreach(i => println(i))
Has anyone seen this issue or have any advice as to how to resolve this?