4
votes

I've been using Spark 2.4 for a while and just started switching to Spark 3.0 in these last few days. I got this error after switching to Spark 3.0 for running udf((x: Int) => x, IntegerType):

Caused by: org.apache.spark.sql.AnalysisException: You're using untyped Scala UDF, which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-type argument, and the closure will see the default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. To get rid of this error, you could:
1. use typed Scala UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`
2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { override def call(s: String): Integer = s.length() }, IntegerType)`, if input types are all non primitive
3. set spark.sql.legacy.allowUntypedScalaUDF to true and use this API with caution;

The solutions are proposed by Spark itself and after googling for a while I got to Spark Migration guide page:

In Spark 3.0, using org.apache.spark.sql.functions.udf(AnyRef, DataType) is not allowed by default. Remove the return type parameter to automatically switch to typed Scala udf is recommended, or set spark.sql.legacy.allowUntypedScalaUDF to true to keep using it. In Spark version 2.4 and below, if org.apache.spark.sql.functions.udf(AnyRef, DataType) gets a Scala closure with primitive-type argument, the returned UDF returns null if the input values is null. However, in Spark 3.0, the UDF returns the default value of the Java type if the input value is null. For example, val f = udf((x: Int) => x, IntegerType), f($"x") returns null in Spark 2.4 and below if column x is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default.

source: Spark Migration Guide

I notice that my usual way of using function.udf API, which is udf(AnyRef, DataType), is called UnTyped Scala UDF and the proposed solution, which is udf(AnyRef), is called Typed Scala UDF.

  • To my understanding, the first one looks more strictly typed than the second one where the first one has its output type explicitly defined and the second one does not, hence my confusion on why it's called UnTyped.
  • Also the function got passed to udf, which is (x:Int) => x, clearly has its input type defined but Spark claiming You're using untyped Scala UDF, which does not have the input type information?

Is my understanding correct? Even after more intensive searching I still can't find any material explaining what is UnTyped Scala UDF and what is Typed Scala UDF.

So my questions are: What are they? What are their differences?

2

2 Answers

2
votes

In typed scala UDF, UDF knows the types of the columns passed as argument, whereas in untyped scala UDF, UDF doesn't know the types of the columns passed as argument

When creating typed scala UDF, the types of columns passed as argument and output of the UDF are inferred from the function arguments and output types whereas when creating untyped scala UDF, there is not type inference at all, either for arguments or output.

What can be confusing is that when creating typed UDF the types are inferred from function and not explicitly passed as argument. To be more explicit, you can write typed UDF creation as follow:

val my_typed_udf = udf[Int, Int]((x: Int) => Int)

Now, let's look at the two points you raised.

To my understanding, the first one (eg udf(AnyRef, DataType)) looks more strictly typed than the second one (eg udf(AnyRef)) where the first one has its output type explicitly defined and the second one does not, hence my confusion on why it's called UnTyped.

According to spark functions scaladoc, signatures of the udf functions that transform a function to an UDF are actually, for the first one:

def udf(f: AnyRef, dataType: DataType): UserDefinedFunction 

And for the second one:

def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction

So the second one is actually more typed than the first one, as the second one takes into account the type of the function passed as argument, whereas the first one erases the type of the function.

That's why on the first one you need to define return type, because spark needs this information but can't infer it from function passed as argument as its return type is erased, whereas in the second one the return type is inferred from function passed as argument.

Also the function got passed to udf, which is (x:Int) => x, clearly has its input type defined but Spark claiming You're using untyped Scala UDF, which does not have the input type information?

What is important here is not the function, but how Spark creates an UDF from this function.

In both cases, the function to be transformed to UDF has its input and return types defined, but those types are erased and not taken into account when creating UDF using udf(AnyRef, DataType).

1
votes

This doesn't answer your original question about what the different UDFs are, but if you want to get rid of the error, in Python you can include this line in your script: spark.sql("set spark.sql.legacy.allowUntypedScalaUDF=true").