8
votes

Here is a Spark UDF that I'm using to compute a value using few columns.

def spark_udf_func(s: String, i:Int): Boolean = { 
    // I'm returning true regardless of the parameters passed to it.
    true
}

val spark_udf = org.apache.spark.sql.functions.udf(spark_udf_func _)

val df = sc.parallelize(Array[(Option[String], Option[Int])](
  (Some("Rafferty"), Some(31)), 
  (null, Some(33)), 
  (Some("Heisenberg"), Some(33)),  
  (Some("Williams"), null)
)).toDF("LastName", "DepartmentID")

df.withColumn("valid", spark_udf(df.col("LastName"), df.col("DepartmentID"))).show()
+----------+------------+-----+
|  LastName|DepartmentID|valid|
+----------+------------+-----+
|  Rafferty|          31| true|
|      null|          33| true|
|Heisenberg|          33| true|
|  Williams|        null| null|
+----------+------------+-----+

Can anyone explain why the value for column valid is null for the last row?

When I checked the spark plan I was able to figure that the plan has a case condition where it says if column2 (DepartmentID) is null it has to return null.

== Physical Plan ==

*Project [_1#699 AS LastName#702, _2#700 AS DepartmentID#703, if (isnull(_2#700)) null else UDF(_1#699, _2#700) AS valid#717]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), assertnotnull(input[0, scala.Tuple2, true])._1), true) AS _1#699, unwrapoption(IntegerType, assertnotnull(input[0, scala.Tuple2, true])._2) AS _2#700]
   +- Scan ExternalRDDScan[obj#698]

Why do we have such behaviour in Spark?
Why only Integer columns?
What is it that I'm doing wrong here, what is the proper way to handle null's within UDF when the UDF parameter is null?

2

2 Answers

9
votes

The issue is that null is not a valid value for scala Int (which is the backing value) while it is a valid value for String. Int is equivalent to java int primitive and must have a value. This means the udf can't be called when the value is null and therefore null remains.

There are two ways to solve this:

  1. Change the function to accept java.lang.Integer (which is an object and can be null)
  2. If you can't change the function, you can use when/otherwise to do something special in case of null. For example when(col("int col").isNull, someValue).otherwise(the original call)

A good explanation of this can be found here

0
votes

To accept null, Please use Integer ( Java datatype instead of Scala Int)

def spark_udf_func(s: String, i:Integer): Boolean = { 
    // I'm returning true regardless of the parameters passed to it.
    if(i == null) false else true
}