4
votes

I have a PySpark Dataframe with two columns (A, B, whose type is double) whose values are either 0.0 or 1.0. I am trying to add a new column, which is the sum of those two. I followed examples in Pyspark: Pass multiple columns in UDF

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType
sum_cols = F.udf(lambda x: x[0]+x[1], IntegerType())
df_with_sum = df.withColumn('SUM_COL',sum_cols(F.array('A','B')))
df_with_sum.select(['SUM_COL']).toPandas()

This shows a Series of NULLs instead of the results I expect.

I tried any of the following to see if there's an issue with data types

sum_cols = F.udf(lambda x: x[0], IntegerType())
sum_cols = F.udf(lambda x: int(x[0]), IntegerType())

still getting Nulls.

I tried removing the array:

sum_cols = F.udf(lambda x: x, IntegerType())
df_with_sum = df.withColumn('SUM_COL',sum_cols(df.A))

This works fine and shows 0/1

I tried removing the UDF, but leaving the array:

df_with_sum = df.withColumn('SUM_COL', F.array('A','B'))

This works fine and shows a series of arrays of [0.0/1.0, 0.0/1.0]

So, array works fine, UDF works fine, it is just when I try to pass an array to UDF that things break down. What am I doing wrong?

1
what are the datatypes of columns A and B? can you check that and update the question?Ramesh Maharjan
@RameshMaharjan yes, updated! The type is double.eran
how 0/1 is double? is it 0/1 or 0.1?Ramesh Maharjan
@RameshMaharjan it is 0.0 or 1.0 for A and B, the output should be 0, 1 or 2 (depending on what the operation is, I showed several examples - in one of them I call int() within the UDF, and the UDF return is IntegerType, in another I don't use a UDF at all, so it is 0.0 or 1.0)eran
I am not understanding the form 0.0/1.0 it should be 0.0 if the datatype is double. if the value is 0.0/1.0 then the datatype should be StringType. isn't it so?Ramesh Maharjan

1 Answers

4
votes

The problem is that you are trying to return a double in a function that is supposed to output an integer, which does not fit, and pyspark by default silently resorts to NULL when a casting fails:

df_with_doubles = spark.createDataFrame([(1.0,1.0), (2.0,2.0)], ['A', 'B'])
sum_cols = F.udf(lambda x: x[0]+x[1], IntegerType())
df_with_sum = df_with_double.withColumn('SUM_COL',sum_cols(F.array('A','B')))
df_with_sum.select(['SUM_COL']).toPandas()

You get:

  SUM_COL
0    None
1    None

However, if you do:

df_with_integers = spark.createDataFrame([(1,1), (2,2)], ['A', 'B'])
sum_cols = F.udf(lambda x: x[0]+x[1], IntegerType())
df_with_sum = df_with_integers.withColumn('SUM_COL',sum_cols(F.array('A','B')))
df_with_sum.select(['SUM_COL']).toPandas()

You get:

   SUM_COL
0        2
1        4

So, either cast your columns to IntegerType beforehand (or cast them in the UDF), or change the return type of the UDF to DoubleType.