22
votes

Similar question as here, but don't have enough points to comment there.

According to the latest Spark documentation an udf can be used in two different ways, one with SQL and another with a DataFrame. I found multiple examples of how to use an udf with sql, but have not been able to find any on how to use a udf directly on a DataFrame.

The solution provided by the o.p. on the question linked above uses __callUDF()__ which is _deprecated_ and will be removed in Spark 2.0 according to the Spark Java API documentation. There, it says:

"since it's redundant with udf()"

so this means I should be able to use __udf()__ to cal a my udf, but I can't figure out how to do that. I have not stumbled on anything that spells out the syntax for Java-Spark programs. What am I missing?

import org.apache.spark.sql.api.java.UDF1;
.
.    
UDF1 mode = new UDF1<String[], String>() {
    public String call(final String[] types) throws Exception {
        return types[0];
    }
};

sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?
1
It is not. Check carefully signatures :) Some example code? UDF + data? Some formatting?zero323
Added code to clarify what I'm asking. As for the complaining part, I have a nagging feeling that I'm not doing it right. It should not take hours to figure out how to do things in Java-Spark. I think I'm missing something, some book(s), some documentation somewhere, some source of knowledge that will make the clues I get from my IDE sufficient to do things without having to google for hours. Everything I find is Scala and it's not clear at all to me how to do the same things in Java.Kai
Well, technically speaking Scala classes are valid Java classes. It means these can be used directly in Java. Problem is that Scala is much richer language than Java. It means that many things cannot be easily done without unwrapping all the Scala magic.zero323
so you're telling me that I do need to move to Scala.. it does seem that this would be a better investment of time than keep trying to shoehorn Spark into Java code. Thank you.Kai
Not necessarily but it can be easier for you than dealing with Scala internals.zero323

1 Answers

35
votes

Spark >= 2.3

Scala-style udf can be invoked directly:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction mode = udf(
  (Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);

df.select(mode.apply(col("vs"))).show();

Spark < 2.3

Even if we assume that your UDF is useful and cannot be replaced by a simple getItem call it has incorrect signature. Array columns are exposed using Scala WrappedArray not plain Java Arrays so you have to adjust the signature:

UDF1 mode = new UDF1<Seq<String>, String>() {
  public String call(final Seq<String> types) throws Exception {
    return types.headOption();
  }
};

If UDF is already registered:

sqlContext.udf().register("mode", mode, DataTypes.StringType);

you can simply use callUDF (which is a new function introduced in 1.5) to call it by name:

df.select(callUDF("mode", col("vs"))).show();

You can also use it in selectExprs:

df.selectExpr("mode(vs)").show();