I've been using Spark 2.4 for a while and just started switching to Spark 3.0 in these last few days. I got this error after switching to Spark 3.0 for running udf((x: Int) => x, IntegerType):
Caused by: org.apache.spark.sql.AnalysisException: You're using untyped Scala UDF, which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-type argument, and the closure will see the default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. To get rid of this error, you could:
1. use typed Scala UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`
2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { override def call(s: String): Integer = s.length() }, IntegerType)`, if input types are all non primitive
3. set spark.sql.legacy.allowUntypedScalaUDF to true and use this API with caution;
The solutions are proposed by Spark itself and after googling for a while I got to Spark Migration guide page:
In Spark 3.0, using org.apache.spark.sql.functions.udf(AnyRef, DataType) is not allowed by default. Remove the return type parameter to automatically switch to typed Scala udf is recommended, or set spark.sql.legacy.allowUntypedScalaUDF to true to keep using it. In Spark version 2.4 and below, if org.apache.spark.sql.functions.udf(AnyRef, DataType) gets a Scala closure with primitive-type argument, the returned UDF returns null if the input values is null. However, in Spark 3.0, the UDF returns the default value of the Java type if the input value is null. For example, val f = udf((x: Int) => x, IntegerType), f($"x") returns null in Spark 2.4 and below if column x is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default.
source: Spark Migration Guide
I notice that my usual way of using function.udf API, which is udf(AnyRef, DataType), is called UnTyped Scala UDF and the proposed solution, which is udf(AnyRef), is called Typed Scala UDF.
- To my understanding, the first one looks more strictly typed than the second one where the first one has its output type explicitly defined and the second one does not, hence my confusion on why it's called UnTyped.
- Also the function got passed to
udf, which is(x:Int) => x, clearly has its input type defined but Spark claimingYou're using untyped Scala UDF, which does not have the input type information?
Is my understanding correct? Even after more intensive searching I still can't find any material explaining what is UnTyped Scala UDF and what is Typed Scala UDF.
So my questions are: What are they? What are their differences?