1
votes

I'm facing an issue when mixing python map and lambda functions on a Spark environment.

Given df1, my source dataframe:

Animals     | Food      | Home
----------------------------------
Monkey      | Banana    | Jungle
Dog         | Meat      | Garden
Cat         | Fish      | House
Elephant    | Banana    | Jungle
Lion        | Meat      | Desert

I want to create another dataframe df2. It will contain two columns with a row per column of df1 (3 in my example). The first column would contain the name of df1 columns. The second column would contain an array of elements with the most occurrences (n=3 in the example below) and the count.

Column      | Content
-----------------------------------------------------------
Animals     | [("Cat", 1), ("Dog", 1), ("Elephant", 1)]
Food        | [("Banana", 2), ("Meat", 2), ("Fish", 1)]
Home        | [("Jungle", 2), ("Desert", 1), ("Garden", 1)]

I tried to do it with python list, map and lambda functions but I had conflicts with PySpark functions:

def transform(df1):
    # Number of entry to keep per row
    n = 3
    # Add a column for the count of occurence
    df1 = df1.withColumn("future_occurences", F.lit(1))

    df2 = df1.withColumn("Content",
        F.array(
            F.create_map(
                lambda x: (x,
                    [
                        str(row[x]) for row in df1.groupBy(x).agg(
                            F.sum("future_occurences").alias("occurences")
                        ).orderBy(
                            F.desc("occurences")
                        ).select(x).limit(n).collect()
                    ]
                ), df1.columns
            )
        )
    )
    return df2

The error is:

TypeError: Invalid argument, not a string or column: <function <lambda> at 0x7fc844430410> of type <type 'function'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Any idea how to fix it?

Thanks a lot!

1
This can be done, but it's not really the type of problem that spark is designed for. You can treat each column independently and union the results. How do you break ties? Why is it Cat, Dog, Elephant when the other two animals also have a count of 1?pault
@PentaKill I prefer to post my code to illustrate the problem I'm facing. I don't understand why you say it is useless.Maxbester
@pault thanks for your comment. I'm new to spark so I still need to learn. Yes I guess I could treat column independently but I wasn't sure it was the best solution. I break ties with alphabetic order. This is why I didn't display Lion and Monkey.Maxbester

1 Answers

3
votes

Here is one possible solution, in which the Content column will be an array of StructType with two named fields: Content and count.

from pyspark.sql.functions import col, collect_list, desc, lit, struct
from functools import reduce 

def transform(df, n):
    return reduce(
        lambda a, b: a.unionAll(b),
        (
            df.groupBy(c).count()\
                .orderBy(desc("count"), c)\
                .limit(n)\
                .withColumn("Column", lit(c))\
                .groupBy("Column")\
                .agg(
                    collect_list(
                        struct(
                            col(c).cast("string").alias("Content"), 
                            "count")
                    ).alias("Content")
                )
            for c in df.columns
        )
    )

This function will iterate through each of the columns in the input DataFrame, df, and count the occurrence of each value. Then we orderBy the count (descending) and the column value it self (alphabetically) and keep only the first n rows (limit(n)).

Next, collect the values into an array of structs and finally union together the results for each column. Since the union requires each DataFrame to have the same schema, you will need to cast the column value to a string.

n = 3
df1 = transform(df, n)
df1.show(truncate=False)
#+-------+------------------------------------+
#|Column |Content                             |
#+-------+------------------------------------+
#|Animals|[[Cat,1], [Dog,1], [Elephant,1]]    |
#|Food   |[[Banana,2], [Meat,2], [Fish,1]]    |
#|Home   |[[Jungle,2], [Desert,1], [Garden,1]]|
#+-------+------------------------------------+

This isn't exactly the same output that you asked for, but it will probably be sufficient for your needs. (Spark doesn't have tuples in the way you described.) Here's the new schema:

df1.printSchema()
#root
# |-- Column: string (nullable = false)
# |-- Content: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- Content: string (nullable = true)
# |    |    |-- count: long (nullable = false)