1
votes

I am trying to use a udf that would be the equivalent of:

df.select(when(col("abc").isNotNull and col("abc") =!= "" and col("age") <= 18, 1).otherwise(0).alias("something"))

I declared the udf like:

//return Int 0 or 1 if conditions are true 
val myudf_x = udf((col_name: String, col_value: String, num: Int) => {
  when(col_name.isNotNull and col_name =!= "" and col_value < num, 1).otherwise(0)

})

usage:

  df.select(
  "col_abc",
  myudf(col("col_abc"), col("age"), 18).alias("something")
)

but I get an error:

Schema for type org.apache.spark.sql.Column is not supported

I've also tried the udf with String types instead of column type

What is the issue?

thanks

1

1 Answers

7
votes

A simple distinction:

  • Expressions operate on SQL types (Columns).
  • udfs operate on external (Scala) types.

If you want a function using expression DSL:

import org.apache.spark.sql.Column

// You can use function:
// def f(col_name: Column, col_value: Column, num: Column) = ???
// I used closure syntax to highlight difference in types
val f: (Column, Column, Column) => Column = 
  (col_name: Column, col_value: Column, num: Column) =>  when(
    col_name.isNotNull and col_name =!= "unknown" and col_value < num, 
    1
  ).otherwise(0)

otherwise:

val g: UserDefinedFunction = udf(
  (col_name: String, col_value: String, num: Int) => {
    if (col_name != null && col_name != "unknown" && col_value < num) 1 else 0
  }
)

but in current form the udf won't type check (col_value is a String and num is Int - they cannot be compared with <).

Maybe you wanted col_value.cast("int") < num / col_value.toInt < num?