
I'm writing filter function for complex JSON dataset with lot's of inner structures. Passing individual columns is too cumbersome.

So I declared the following UDF:

val records:DataFrame = = sqlContext.jsonFile("...")
def myFilterFunction(r:Row):Boolean=???
sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r))

Intuitively I'm thinking it will work like this:


What is the actual syntax?

You have to use struct() function for constructing the row while making a call to the function, follow these steps.

Import Row,

import org.apache.spark.sql._

Define the UDF

def myFilterFunction(r:Row) = {r.get(0)==r.get(1)} 

Register the UDF

sqlContext.udf.register("myFilterFunction", myFilterFunction _)

Create the dataFrame

val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2")

Use the UDF


When u want all columns to be passed to UDF.

records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show 


|  text| text2|
scala> inputDF
res40: org.apache.spark.sql.DataFrame = [email: string, first_name: string ... 3 more fields]

scala> inputDF.printSchema
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)

Now, I would like to filter the rows based on the Gender Field. I can accomplish that by using the .filter($"gender" === "Male") but I would like to do with the .filter(function).

So, defined my anonymous functions

val isMaleRow = (r:Row) => {r.getAs("gender") == "Male"}

val isFemaleRow = (r:Row) => { r.getAs("gender") == "Female" }



I felt the requirement can be done in a better way i.e without declaring as UDF and invoke it.


In addition to the first answer. When we want all columns to be passed to UDF we can use


If you want to take an action over the whole row and process it in a distributed way, take the row in the DataFrame and send to a function as a struct and then convert to a dictionary to execute the specific action, is very important to execute the collect method over the final DataFrame because Spark has the LazyLoad activated and don't work with full data at less you tell it explicitly.

In my case I should send the row of a DataFrame to index as Dictionary object:

  1. Import libraries.
  2. Declare the udf and the lambda must receiving the row structure.
  3. Execute specific function, in this case send to index a dictionary (the row structure converted to a dict).
  4. The DataFrame origin execute a withColum method that indicates to Spark execute this in each row, before make the call to collect, this allows to execute the function in a distribuible way. Don't forget send to a other DataFrame Variable.
  5. Execute the collect method to execute the process and distribute the function.
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType

myUdf = udf(lambda row: sendToES(row.asDict()), IntegerType())
dfWithControlCol = df.withColumn("control_col", myUdf(struct([df[x] for x in df.columns])))