0
votes

I am new to spark/scala. I am trying to read some data from a hive table to a spark dataframe and then add a column based on some condition. Here is my code:

val DF = hiveContext.sql("select * from (select * from test_table where partition_date='2017-11-22') a JOIN (select max(id) as bid from test_table where partition_date='2017-11-22' group by at_id) b ON a.id=b.bid")

def dateDiff(partition_date: org.apache.spark.sql.Column, item_due_date: org.apache.spark.sql.Column): Long ={
      ChronoUnit.DAYS.between(LocalDate.parse(partition_date.toString()), LocalDate.parse(item_due_date.toString))
    }

val finalDF = DF.withColumn("status", 
                   when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && !(col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "approved")
                  .when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "pending")
                  .when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) >= 0), "expired")
                  .otherwise("null"))

dateDiff is a function that calculates the difference between partition_date and item_due_date, which are columns in DF. I am trying to add a new column to DF by using when and otherwise which uses the dateDiff to get the difference between dates.

Now, when I run the above code, I get the following error: org.threeten.bp.format.DateTimeParseException: Text 'partition_date' could not be parsed at index 0

I believe the value of the column partition_date is not being converted to a String to be parsed as a date. Is this what's happening? If yes, how do I cast the column value to a String ?

Below is the schema of the columns I am using from the DF :

 |-- item_due_date: string (nullable = true)
 |-- past_due: integer (nullable = true)
 |-- item_decision: string (nullable = true)
 |-- partition_date: string (nullable = true)

A data sample of the columns I am using from the DF :

+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
|       1|   0001-01-14|         null|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       0|   2018-03-18|         null|    2017-11-22|
|       1|   2016-11-30|         null|    2017-11-22|
+--------+-------------+-------------+--------------+

I also tried using a custom UDF:

  def status(past_due: Int, item_decision: String, maxPartitionDate: String, item_due_date: String): String = {
      if (past_due == 1 && item_due_date != "NULL") {
        if (ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(item_due_date.trim)) < 0) {
          if (item_decision != "NULL") "pending"
          else "approved"
        } else "expired"
      } else "NULL"
    }

val statusUDF = sqlContext.udf.register("statusUDF", status _)

val DF2 = DF.withColumn("status", statusUDF(DF("past_due"),DF("item_decision"),DF("partition_date"),DF("item_due_date")))
DF2.show()

And it throws the following error at the DF2.show statement, everytime:

Container exited with a non-zero exit code 50

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1644)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1603)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1592)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
        at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
        at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
        at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
        at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
        at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
        at driver$.main(driver.scala:109)
        at driver.main(driver.scala)

Any help would be appreciated. Thank you!

1
your function is column based. so you can utilize spark functions if any functions satisfies your needs. Otherwise if you want to manipulate in primitive datatypes you would have to go with udf functions. - Ramesh Maharjan
I tried using a UDF instead of when and otherwise, but I was facing errors while displaying/saving the dataframe. So I switched to this approach. Is there a way I can solve the error in this approach? - Hemanth
You will have to update your question with sample dataframe and schema of the dataframe. that will help you get answers quickly - Ramesh Maharjan
@Hemanth The function you wrote is not udf, you should read more careful about how to write apropriate udf. - Thang Nguyen
I know it's not a UDF. It's a normal scala function to get the difference in dates. @cue Any ideas on how to make it work? - Hemanth

1 Answers

2
votes

You can simply use datediff inbuilt function to check for the days difference between two columns. you don't need to write your function or udf function. And when function is also modified than yours

import org.apache.spark.sql.functions._
val finalDF = DF.withColumn("status",
  when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) < 0) && col("item_decision").isNotNull && !(lower(col("item_decision")).equalTo("null")), "approved")
    .otherwise(when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").isNull || lower(col("item_decision")).equalTo("null")), "pending")
      .otherwise(when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) >= 0), "expired")
    .otherwise("null"))))

This logic will convert the dataframe

+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
|1       |2017-12-14   |null         |2017-11-22    |
|1       |2017-12-14   |Mitigate     |2017-11-22    |
|1       |0001-01-14   |Mitigate     |2017-11-22    |
|1       |0001-01-14   |Mitigate     |2017-11-22    |
|0       |2018-03-18   |null         |2017-11-22    |
|1       |2016-11-30   |null         |2017-11-22    |
+--------+-------------+-------------+--------------+

with addition of status column as

+--------+-------------+-------------+--------------+--------+
|past_due|item_due_date|item_decision|partition_date|status  |
+--------+-------------+-------------+--------------+--------+
|1       |2017-12-14   |null         |2017-11-22    |pending |
|1       |2017-12-14   |Mitigate     |2017-11-22    |approved|
|1       |0001-01-14   |Mitigate     |2017-11-22    |expired |
|1       |0001-01-14   |Mitigate     |2017-11-22    |expired |
|0       |2018-03-18   |null         |2017-11-22    |null    |
|1       |2016-11-30   |null         |2017-11-22    |expired |
+--------+-------------+-------------+--------------+--------+

I hope the answer is helpful