Here is a Spark UDF that I'm using to compute a value using few columns.
def spark_udf_func(s: String, i:Int): Boolean = {
// I'm returning true regardless of the parameters passed to it.
true
}
val spark_udf = org.apache.spark.sql.functions.udf(spark_udf_func _)
val df = sc.parallelize(Array[(Option[String], Option[Int])](
(Some("Rafferty"), Some(31)),
(null, Some(33)),
(Some("Heisenberg"), Some(33)),
(Some("Williams"), null)
)).toDF("LastName", "DepartmentID")
df.withColumn("valid", spark_udf(df.col("LastName"), df.col("DepartmentID"))).show()
+----------+------------+-----+
| LastName|DepartmentID|valid|
+----------+------------+-----+
| Rafferty| 31| true|
| null| 33| true|
|Heisenberg| 33| true|
| Williams| null| null|
+----------+------------+-----+
Can anyone explain why the value for column valid is null for the last row?
When I checked the spark plan I was able to figure that the plan has a case condition where it says if column2 (DepartmentID) is null it has to return null.
== Physical Plan ==
*Project [_1#699 AS LastName#702, _2#700 AS DepartmentID#703, if (isnull(_2#700)) null else UDF(_1#699, _2#700) AS valid#717]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), assertnotnull(input[0, scala.Tuple2, true])._1), true) AS _1#699, unwrapoption(IntegerType, assertnotnull(input[0, scala.Tuple2, true])._2) AS _2#700]
+- Scan ExternalRDDScan[obj#698]
Why do we have such behaviour in Spark?
Why only Integer columns?
What is it that I'm doing wrong here, what is the proper way to handle null's within UDF when the UDF parameter is null?