I have this java code, where a spark UDF takes a Row as an input and returns a Row. There is also a broadcast variable which is a HashMap.
All the UDF does is it checks if the broadcast HashMap contains the rowKey and if it does, returns a new row with some existing values from input row and some updated values from the broadcast HashMap. If not, returns the input row as is. I do this as I want to update the row column values based on values in the HashMap. Here is the code:
Broadcast variable
final Broadcast<HashMap<String, HashMap<String, String>>> broadcastVariable = jsc.broadcast(someHashMap);
UDF Definition
UDF1<Row, Row> myUDF = new UDF1<Row, Row> () {
@Override
public Row call(Row inputRow) {
String myKey = inputRow.getString(3);
if (broadcastVariable.value().containsKey(myKey)){
Map<String, String> valuesToUpdate = broadcastVariable.value().get(myKey);
String col1 = inputRow.getString(0);
String col2 = inputRow.getString(1);
String col3 = inputRow.getString(2);
for (Map.Entry<String, String> entry : valuesToUpdate.entrySet())
{
String columnName = entry.getKey();
switch(columnName) {
case "col1" :
col1 = entry.getValue();
break;
case "col2" :
col2 = entry.getValue();
break;
case "col3" :
col3 = entry.getValue();
break;
}
}
return RowFactory.create(col1,col2,col3,myKey);
}
return inputRow;
}
};
UDF Registration
hiveContext.udf().register("myUDF", myUDF, DataTypes.createStructType(DF1.schema().fields()));
UDF Call
DataFrame DF2 = DF1.select(org.apache.spark.sql.functions.callUDF
("myUDF", org.apache.spark.sql.functions.struct(DF1.col("col1"),
DF1.col("col2"),
DF1.col("col3"),
DF1.col("myKey"))));
I have the following questions,
How can I pass all the columns in the dataframe to the UDF without listing them one by one? The reason I'm asking this is the actual DataFrame has more than 50 columns. I saw this example, but couldn't get it to work in Java.
Is there a way I can access the row columns by name within the UDF? Right now I'm using getString(int).
The UDF output, is a Struct with a name myUDF(struct(col1,col2,col3,myKey)). It gets really long with 50+ columns. How can I alias this?
Any help is appreciated!