3
votes

I have written a UDF. It is very slow. I would like to replace it with a pandas_udf to take advantage of vectorization.

The actual udf is a bit more complicated, but I have created a simplified toy version of it.

My question: is it possible to replace the UDF in my toy example with a pandas_udf that would take advantage of vectorization? If not, why not?

P.S: I know I could achieve the same effect without a UDF. That is because I simplified the example, but that is not my goal.

from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, StringType
import pandas as pd

#Example data
df = spark.createDataFrame(pd.DataFrame({ 'Letter': [['A', 'A', 'C'], ['A', 'C', 'A', 'D']],
                                          'Number': [[2, 1, 1], [3, 1, 1, 2]],
                                        })
                          )

# The UDF I hope to replace with a pandas_udf
@f.udf(ArrayType(StringType()))
def array_func(le, nr):
    res=[]
    for i in range(len(nr)):
        if nr[i]==1:
            res.append(le[i])
        else:
            res.append('Nope')
    return res

# Applying the udf
df = df.withColumn('udf', array_func('Letter','Number'))
df.show()
2

2 Answers

3
votes

How about this?

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
import pandas as pd

#Example data
df = spark.createDataFrame(pd.DataFrame({ 'Letter': [['A', 'A', 'C'], ['A', 'C', 'A', 'D']],
                                          'Number': [[2, 1, 1], [3, 1, 1, 2]],
                                        })
                          )
df.show()

# Add a dummy column so you can use groupby
df = df.withColumn('id', F.lit(1))
schm = StructType(df.schema.fields + [StructField('udf', ArrayType(StringType()), True)])
@pandas_udf(schm, PandasUDFType.GROUPED_MAP)
def array_udf(pdf):
    res=[]
    for ls, ns in zip(pdf['Letter'], pdf['Number']):
        r = [l if n == 1 else 'Nope' for l, n in zip(ls, ns)]
        res.append(r)
    pdf['udf'] = res
    return pdf

df = df.groupby('id').apply(array_udf).drop('id')
df.show()

The output:

+------------+------------+------------------+
|      Letter|      Number|               udf|
+------------+------------+------------------+
|   [A, A, C]|   [2, 1, 1]|      [Nope, A, C]|
|[A, C, A, D]|[3, 1, 1, 2]|[Nope, C, A, Nope]|
+------------+------------+------------------+

1
votes

I've created a new function named array_func_pd using pandas_udf, just to differentiate the original array_func, so that you have both functions to compare and play around.

from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, StringType
import pandas as pd

@f.pandas_udf(ArrayType(StringType()))
def array_func_pd(le, nr):
"""
   le:  pandas.Series< numpy.ndarray<string> >
   nr:  pandas.Series< numpy.ndarray<int> >

   return: pd.Series< list<string> >
"""
    res=[]
    for i, (l_lst, n_lst) in enumerate(zip(le, nr)):
        ret_lst = []
        res.append(ret_lst)
        l_lst2 = l_lst.tolist()
        n_lst2 = n_lst.tolist()
        for j,(l, n) in enumerate(zip(l_lst2, n_lst2)):
            if n == 1:
                ret_lst.append(l)
            else:
                ret_lst.append('Nope')
    return pd.Series(res)

# Applying the udf
df = df.withColumn('udf', array_func_pd('Letter','Number'))
df.show()

And here is the output:

+------------+------------+------------------+
|      Letter|      Number|               udf|
+------------+------------+------------------+
|   [A, A, C]|   [2, 1, 1]|      [Nope, A, C]|
|[A, C, A, D]|[3, 1, 1, 2]|[Nope, C, A, Nope]|
+------------+------------+------------------+

There are two types of Pandas UDFs(aka Vectorized UDFs). For your case, I think it is best to keep it simple and use Scalar Pandas UDF.

Here is the notes for Scalar Pandas UDF from official document:

The Python function should take pandas.Series as inputs and return a pandas.Series of the same length. Internally, Spark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.

So in my code:

The output of the udf should be pd.Series, and should share the same count with le or nr.