2
votes

I have a piece of code which works well but uses pandas data frame groupby processing. However because the file is large ( > 70 million groups I need to convert the code to use PYSPARK data frame. Here is the original code using pandas dataframe with small example data:

import pandas as pd
import numpy as np
from scipy.optimize import minimize

df = pd.DataFrame({
'y0': np.random.randn(20),
'y1': np.random.randn(20),
'x0': np.random.randn(20), 
'x1': np.random.randn(20),
'grpVar': ['a', 'b'] * 10})

# Starting values
startVal = np.ones(2)*(1/2)

#Constraint  Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})

# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)

# Define a function to calculate sum of squared differences
def SumSqDif(a, df):
    return np.sum((df['y0'] - a[0]*df['x0'])**2 + (df['y1'] - a[1]*df['x1'])  **2)

# Define a function to call minimize function 
def RunMinimize(data, startVal, bnds, cons):
    ResultByGrp = minimize(SumSqDif, startVal, method='SLSQP',
    bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

# Do the calculation by applyng the function by group:
# Create GroupBy object
grp_grpVar = df.groupby('grpVar')

Results = grp_grpVar.apply(RunMinimize, startVal=startVal, bnds=bnds, cons=cons))

Now I am trying to use pySpark dataframe I convert pandas dataframe to pyspark dataframe for the purpose of testing code.

sdf = sqlContext.createDataFrame(df)
type(sdf)
#  <class 'pyspark.sql.dataframe.DataFrame'>

# Create GroupBy object
Sgrp_grpVar = sdf.groupby('grpVar')

# Redefine functions
def sSumSqDif(a, sdf):
    return np.sum((sdf['y0'] - a[0]*sdf['x0'])**2 + (sdf['y1'] - a[1]*sdf['x1'])**2)

def sRunMinimize(data=sdf, startVal=startVal, bnds=bnds, cons=cons):
    ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                       bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType

udf = UserDefinedFunction(sRunMinimize , StringType())

Results = Sgrp_grpVar.agg(sRunMinimize()) 

However after I tried to define the user defined function udf I got the following errors - see below. Any help correcting my errors or suggesting an alternative approach is highly appreciated.

udf = UserDefinedFunction(sRunMinimize , StringType()) Traceback (most recent call last): File "", line 1, in File "/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py", line 1760, in init self._judf = self._create_judf(name).......

1
My first observation is that you can't send an entire Spark dataframe as an argument to a udf, only columns of a dataframe.femibyte

1 Answers

3
votes

You're trying to write a User Defined Aggregate Function which can't be done in pyspark see https://stackoverflow.com/a/40030740.

What you can write instead is a UDF on the data within each group collected as a list:

First for the set-up:

import pandas as pd 
import numpy as np 
from scipy.optimize import minimize
import pyspark.sql.functions as psf
from pyspark.sql.types import *

df = pd.DataFrame({
    'y0': np.random.randn(20),
    'y1': np.random.randn(20),
    'x0': np.random.randn(20), 
    'x1': np.random.randn(20),
    'grpVar': ['a', 'b'] * 10})
sdf = sqlContext.createDataFrame(df)

# Starting values
startVal = np.ones(2)*(1/2)
#Constraint  Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})
# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)

We'll broadcast these variables since we need to call them on every row of the aggregated dataframe, it will copy the values to every node so they don't have to go get them on the driver:

sc.broadcast(startVal)
sc.broadcast(bnds)

Let's aggregate the data using collect_list, we'll change the structure of the data around so we only have one column (you can collect each column into distinct columns but then you'd have to modify the way you pass data to the function):

Sgrp_grpVar = sdf\
    .groupby('grpVar')\
    .agg(psf.collect_list(psf.struct("y0", "y1", "x0", "x1")).alias("data"))
Sgrp_grpVar.printSchema()

    root
     |-- grpVar: string (nullable = true)
     |-- data: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- y0: double (nullable = true)
     |    |    |-- y1: double (nullable = true)
     |    |    |-- x0: double (nullable = true)
     |    |    |-- x1: double (nullable = true)

We can now create our UDF, the returned data type is too complex for pyspark, numpy arrays are not supported by pyspark so we'll need to change it a bit:

def sSumSqDif(a, data):
    return np.sum(
        (data['y0'] - a[0]*data['x0'])**2 \
        + (data['y1'] - a[1]*data['x1'])**2)

def sRunMinimize(data, startVal=startVal, bnds=bnds, cons=cons):
    data = pd.DataFrame({k:v for k,v in zip(["y0", "y1", "x0", "x1"], data)})
    ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                       bounds=bnds, constraints = cons, args=(data))
    return ResultByGrp.x.tolist()

sRunMinimize_udf = lambda startVal, bnds, cons: psf.udf(
    lambda data: sRunMinimize(data, startVal, bnds, cons), 
    ArrayType(DoubleType())
)

We can now apply this function to the collected data in each group:

Results = sdf_agg.select(
    "grpVar", 
    sRunMinimize_udf(startVal, bnds, cons)("data").alias("res")
)
Results.show(truncate=False)

    +------+-----------------------------------------+
    |grpVar|res                                      |
    +------+-----------------------------------------+
    |b     |[0.4073139282953772, 0.5926860717046227] |
    |a     |[0.8275186444565927, 0.17248135554340727]|
    +------+-----------------------------------------+

But I don't think pyspark is the right tool for this.