2
votes

Using Pyspark 2.2

I have a spark DataFrame with multiple columns. I need to input 2 columns to a UDF and return a 3rd column

Input:

+-----+------+
|col_A| col_B|
+-----+------+
|  abc|abcdef|
|  abc|     a|
+-----+------+

Both col_A and col_B are StringType()

Desired output:

+-----+------+-------+
|col_A| col_B|new_col|
+-----+------+-------+
|  abc|abcdef|    abc|
|  abc|     a|      a|
+-----+------+-------+

I want new_col to be a substring of col_A with the length of col_B.

I tried

udf_substring = F.udf(lambda x: F.substring(x[0],0,F.length(x[1])), StringType())
df.withColumn('new_col', udf_substring([F.col('col_A'),F.col('col_B')])).show()

But it gives the TypeError: Column is not iterable.

Any idea how to do such manipulation?

1

1 Answers

2
votes

There are two major things wrong here.

  • First, you defined your udf to take in one input parameter when it should take 2.
  • Secondly, you can't use the API functions within the udf. (Calling the udf serializes to python so you need to use python syntax and functions.)

Here's a proper udf implementation for this problem:

import pyspark.sql.functions as F

def my_substring(a, b):
    # You should add in your own error checking
    return a[:len(b)]

udf_substring = F.udf(lambda x, y: my_substring(a, b), StringType())

And then call it by passing in the two columns as arguments:

df.withColumn('new_col', udf_substring(F.col('col_A'),F.col('col_B')))

However, in this case you can do this without a udf using the method described in this post.

df.withColumn(
    'new_col', 
    F.expr("substring(col_A,0,length(col_B))")
)