2
votes

I am trying to implement a UDF in spark; that can take both a literal and column as an argument. To achieve this, I believe I can use a curried UDF.

The function is used to match a string literal to each value in the column of a DataFrame. I have summarized the code below:-

def matching(match_string_1):
    def matching_inner(match_string_2):
        return difflib.SequenceMatcher(None, match_string_1, match_string_2).ratio()
    return matching

hc.udf.register("matching", matching)
matching_udf = F.udf(matching, StringType())

df_matched = df.withColumn("matching_score", matching_udf(lit("match_string"))(df.column))
  • "match_string" is actually a value assigned to a list which I am iterating over.

Unfortunately this is not working as I had hoped; and I am receiving

"TypeError: 'Column' object is not callable".

I believe I am not calling this function correctly.

1

1 Answers

3
votes

It should be something like this:

def matching(match_string_1):
    def matching_inner(match_string_2):
        return difflib.SequenceMatcher(
            a=match_string_1, b=match_string_2).ratio()

    # Here create udf.
    return F.udf(matching_inner, StringType())

df.withColumn("matching_score", matching("match_string")(df.column))

If you want to support Column argument for match_string_1 you'll have to rewrite it like this:

def matching(match_string_1):
    def matching_inner(match_string_2):
        return F.udf(
            lambda a, b: difflib.SequenceMatcher(a=a, b=b).ratio(),
            StringType())(match_string_1, match_string_2)

    return  matching_inner

df.withColumn("matching_score", matching(F.lit("match_string"))(df.column)

Your current code doesn't work, matching_udf is and UDF and matching_udf(lit("match_string")) creates a Column expression instead of calling internal function.