30
votes

I'm writing filter function for complex JSON dataset with lot's of inner structures. Passing individual columns is too cumbersome.

So I declared the following UDF:

val records:DataFrame = = sqlContext.jsonFile("...")
def myFilterFunction(r:Row):Boolean=???
sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r))

Intuitively I'm thinking it will work like this:

records.filter("myFilter(*)=true")

What is the actual syntax?

4
Could you specify your filter function a bit more? Using Row throws away a lot of optimizations a DataFrame does for you.Reactormonk
The filter is pretty complex. The structure of the record is several Map fields with bunch of key-value pairs in themMichael Zeltser

4 Answers

35
votes

You have to use struct() function for constructing the row while making a call to the function, follow these steps.

Import Row,

import org.apache.spark.sql._

Define the UDF

def myFilterFunction(r:Row) = {r.get(0)==r.get(1)} 

Register the UDF

sqlContext.udf.register("myFilterFunction", myFilterFunction _)

Create the dataFrame

val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2")

Use the UDF

records.filter(callUdf("myFilterFunction",struct($"text",$"text2"))).show

When u want all columns to be passed to UDF.

records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show 

Result:

+------+------+
|  text| text2|
+------+------+
|sachin|sachin|
+------+------+
3
votes
scala> inputDF
res40: org.apache.spark.sql.DataFrame = [email: string, first_name: string ... 3 more fields]

scala> inputDF.printSchema
root
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)

Now, I would like to filter the rows based on the Gender Field. I can accomplish that by using the .filter($"gender" === "Male") but I would like to do with the .filter(function).

So, defined my anonymous functions

val isMaleRow = (r:Row) => {r.getAs("gender") == "Male"}

val isFemaleRow = (r:Row) => { r.getAs("gender") == "Female" }

inputDF.filter(isMaleRow).show()

inputDF.filter(isFemaleRow).show()

I felt the requirement can be done in a better way i.e without declaring as UDF and invoke it.

1
votes

In addition to the first answer. When we want all columns to be passed to UDF we can use

 struct("*")
0
votes

If you want to take an action over the whole row and process it in a distributed way, take the row in the DataFrame and send to a function as a struct and then convert to a dictionary to execute the specific action, is very important to execute the collect method over the final DataFrame because Spark has the LazyLoad activated and don't work with full data at less you tell it explicitly.

In my case I should send the row of a DataFrame to index as Dictionary object:

  1. Import libraries.
  2. Declare the udf and the lambda must receiving the row structure.
  3. Execute specific function, in this case send to index a dictionary (the row structure converted to a dict).
  4. The DataFrame origin execute a withColum method that indicates to Spark execute this in each row, before make the call to collect, this allows to execute the function in a distribuible way. Don't forget send to a other DataFrame Variable.
  5. Execute the collect method to execute the process and distribute the function.
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType

myUdf = udf(lambda row: sendToES(row.asDict()), IntegerType())
dfWithControlCol = df.withColumn("control_col", myUdf(struct([df[x] for x in df.columns])))
dfWithControlCol.collect()