0
votes

scala/spark use udf function in spark shell for array manipulation in dataframe column

df.printSchema

root
|-- x: timestamp (nullable = true)
|-- date_arr: array (nullable = true)
|    |-- element: timestamp (containsNull = true)

sample data:

|x                      | date_arr                                                              |  
|---------------------- |---------------------------------------------------------------------- |  
| 2009-10-22 19:00:00.0 | [2009-08-22 19:00:00.0, 2009-09-19 19:00:00.0, 2009-10-24 19:00:00.0] |  
| 2010-10-02 19:00:00.0 | [2010-09-25 19:00:00.0, 2010-10-30 19:00:00.0]                        |  

in udf.jar, I have this function to get ceiling date in date_arr according to x:

class CeilToDate extends UDF {
  def evaluate(arr: Seq[Timestamp], x: Timestamp): Timestamp = {
    arr.filter(_.before(x)).last
  }
}

add jar to spark shell: spark-shell --jars udf.jar

in spark shell, I have HiveContext as val hc = new HiveContext(spc), and create function: hc.sql("create temporary function ceil_to_date as 'com.abc.udf.CeilToDate'")

when I make a query: hc.sql("select ceil_to_date(date_arr, x) as ceildate from df").show, expecting to have a dataframe like:

|ceildate              |        
|----------------------|  
|2009-09-19 19:00:00.0 |  
|2010-09-25 19:00:00.0 |  

however, it throws this error:

org.apache.spark.sql.AnalysisException: No handler for Hive udf class com.abc.udf.CeilToDate because: No matching method for class com.abc.udf.CeilToDate with (array, timestamp). Possible choices: FUNC(struct<>, timestamp)

1

1 Answers

1
votes

Why are you going through all the complexity of creating a udf jar and including the jar in spark-shell. You can just create one in spark-shell and use that in your dataframe.

Given you have dataframe as

scala> df.show(false)
+---------------------+---------------------------------------------------------------------+
|x                    |date_arr                                                             |
+---------------------+---------------------------------------------------------------------+
|2009-10-22 19:00:00.0|[2009-08-22 19:00:00.0, 2009-09-19 19:00:00.0, 2009-10-24 19:00:00.0]|
|2010-10-02 19:00:00.0|[2010-09-25 19:00:00.0, 2010-10-30 19:00:00.0]                       |
+---------------------+---------------------------------------------------------------------+

You can create a udf function in spark-shell but before that you would need three imports

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import java.sql.Timestamp
import java.sql.Timestamp

scala> import scala.collection._
import scala.collection._

Then you can create a udf function

scala> def ceil_to_date = udf((arr: mutable.WrappedArray[Timestamp], x: Timestamp) => arr.filter(_.before(x)).last)
ceil_to_date: org.apache.spark.sql.expressions.UserDefinedFunction

Your desired output dataframe can be acheived through different methods but simply you can use select as

scala> df.select(ceil_to_date(col("date_arr"), col("x")).as("ceildate")).show(false)
+---------------------+
|ceildate             |
+---------------------+
|2009-09-19 19:00:00.0|
|2010-09-25 19:00:00.0|
+---------------------+

I hope the answer is helpful