
I have Spark DataFrame DF1 with millions of rows. Each row have up to 100 columns.

col1 | col2 | col3 | ... | colN
v11  | v12  | v13  | ... | v1N
v21  | v22  | v23  | ... | v2N
...  | ...  | ...  | ... | ...

Also, I have another DataFrame DF2 where I have hundreds of rows with name and body columns. Name contains function name, body contains plain Python code, the boolean function which returns true or false. These functions inside their logic, can refer to any column in the single row from DF1.

func_name | func_body
func1     |   col2 < col45
func2     |   col11.contains("London") and col32*col15 < col21
funcN     |   .... 

I need to join both of these DataFrames - DF1 with DF2 and apply each function from Df2 to each row in DF1. Each function must be able to accept the parameters from DF1, let's say dictionary array with key/value pairs which represent name/value of all columns of the corresponding row from DF1.

I know how to join DF1 and DF2, also, I understand that execution of Python functions will not work in destributed fashion. That's fine for now. This is a temporal solution. I just need to destribute all of the rows from DF1 over the workers nodes, and apply each Python function to each row of DF1 in different tasks of Apache Spark application. Evaluate eval() them and pass dictionary array with key/value pairs inside, as I mentioned above.

In general, each Python function is a tag, that I'd like to assign to row in DF1 in case certain function returned true. For example, this is resulting DataFrame DF3:

col1 | col2 | col3 | ... | colN | tags
v11  | v12  | v13  | ... | v1N  | [func1, func76, funcN]
v21  | v22  | v23  | ... | v2N  | [func32]
...  | ...  | ...  | ... | ...  | [..., ..., ..., ..., ...]

Is it possible with PySpark and if so, could you please show an example how it can be achieved? Is UDF functions with Map from DF.columns as an input parameter is a right way to go or it can be done in some more simple fashion? Does Spark have any limitations on how much UDF functions(number) can be registered at one point of time?


1 Answers


You can achieve that using SQL expressions which can be evaluated using expr. However, you'll not be able to join the 2 DataFrames as SQL expressions can't be evaluated as column values (see this post), so you have to collect the functions into a list (as you have only hundreds of lines, it can fit in memory).

Here is a working example you can adapt for your requirement:

data1 = [(1, "val1", 4, 5, "A", 10), (0, "val2", 7, 8, "B", 20),
         (9, "val3", 8, 1, "C", 30), (10, "val4", 2, 9, "D", 30),
         (20, "val5", 6, 5, "E", 50), (3, "val6", 100, 2, "X", 45)]

df1 = spark.createDataFrame(data1, ["col1", "col2", "col3", "col4", "col5", "col6"])

data2 = [("func1", "col1 + col3 = 5 and col2 like '%al1'"),
         ("func2", "col6 = 30 or col1 * col4 > 20"),
         ("func3", "col5 in ('A', 'B', 'C') and col6 - col1 < 30"),
         ("func4", "col2 like 'val%' and col1 > 0")]

df2 = spark.createDataFrame(data2, ["func_name", "func_body"])

# get functions into a list
functions = df2.collect()

# case/when expression to evaluate the functions
satisfied_expr = [when(expr(f.func_body), lit(f.func_name)) for f in functions]

# add new column tags
df1.withColumn("tags", array(*satisfied_expr)) \
    .withColumn("tags", expr("filter(tags, x -> x is not null)")) \

After adding the array column tags, filter function is used to remove null values that correspond to unsatisfied expressions. This function is only available starting from Spark 2.4+, you'll have to use and UDF for older versions.


|col1|col2|col3|col4|col5|col6|tags                 |
|1   |val1|4   |5   |A   |10  |[func1, func3, func4]|
|0   |val2|7   |8   |B   |20  |[func3]              |
|9   |val3|8   |1   |C   |30  |[func2, func3, func4]|
|10  |val4|2   |9   |D   |30  |[func2, func4]       |
|20  |val5|6   |5   |E   |50  |[func2, func4]       |
|3   |val6|100 |2   |X   |45  |[func4]              |