3
votes

I have this java code, where a spark UDF takes a Row as an input and returns a Row. There is also a broadcast variable which is a HashMap.

All the UDF does is it checks if the broadcast HashMap contains the rowKey and if it does, returns a new row with some existing values from input row and some updated values from the broadcast HashMap. If not, returns the input row as is. I do this as I want to update the row column values based on values in the HashMap. Here is the code:

Broadcast variable

final Broadcast<HashMap<String, HashMap<String, String>>> broadcastVariable = jsc.broadcast(someHashMap);

UDF Definition

UDF1<Row, Row> myUDF = new UDF1<Row, Row> () {
    @Override
    public Row call(Row inputRow) {

        String myKey = inputRow.getString(3);

        if (broadcastVariable.value().containsKey(myKey)){
            Map<String, String> valuesToUpdate = broadcastVariable.value().get(myKey);

            String col1 = inputRow.getString(0);
            String col2 = inputRow.getString(1);
            String col3 = inputRow.getString(2);

            for (Map.Entry<String, String> entry : valuesToUpdate.entrySet())
            {
                String columnName = entry.getKey();

                switch(columnName) {
                case "col1" :
                    col1 = entry.getValue();
                    break;
                case "col2" :
                    col2 = entry.getValue();
                    break;
                case "col3" :
                    col3 = entry.getValue();
                    break;
                }
            }
            return RowFactory.create(col1,col2,col3,myKey);

        }
        return inputRow;
    }
};

UDF Registration

hiveContext.udf().register("myUDF", myUDF, DataTypes.createStructType(DF1.schema().fields()));

UDF Call

DataFrame DF2 = DF1.select(org.apache.spark.sql.functions.callUDF
                ("myUDF", org.apache.spark.sql.functions.struct(DF1.col("col1"),
                        DF1.col("col2"),
                        DF1.col("col3"),
                        DF1.col("myKey"))));

I have the following questions,

  1. How can I pass all the columns in the dataframe to the UDF without listing them one by one? The reason I'm asking this is the actual DataFrame has more than 50 columns. I saw this example, but couldn't get it to work in Java.

  2. Is there a way I can access the row columns by name within the UDF? Right now I'm using getString(int).

  3. The UDF output, is a Struct with a name myUDF(struct(col1,col2,col3,myKey)). It gets really long with 50+ columns. How can I alias this?

Any help is appreciated!

2
@agsachin The OP here is asking for a solution in Java whereas the linked thread is in Scala, not exactly a duplicate.ChrisOdney

2 Answers

2
votes

TL;DR Use Dataset.map (and replace the UDF with a map function).


How can I pass all the columns in the dataframe to the UDF without listing them one by one?

dataframe.schema.fieldNames

See Dataset API.

Is there a way I can access the row columns by name within the UDF?

Quoting the scaladoc of Row.fieldIndex:

fieldIndex(name: String): Int Returns the index of a given field name.

and use the index.

It gets really long with 50+ columns. How can I alias this?

Looks like your code would benefit from some refactoring and composition. Working with 50 fields in a single pipeline might get a bit unwieldy.

-2
votes

You don't need to know the column names in advance!

You can have Row type as one of the arguments of your udf. For example:

import org.apache.spark.sql.functions._

val myUdf = udf((row: Row) => <here comes the code inside your udf>)

You call that udf like this:

df.withColumn(newColumnName, myUdf(struct(df.columns map col: _*)))

and then you can access the dataframe row (both structure and data) inside the udf for anything you need, for example - convert the row to a map of (column_name -> column_value):

val myUdf = udf((row: Row) => row.getValuesMap(row.schema.fieldNames))