5
votes

As the subject describes, I have a PySpark Dataframe that I need to melt three columns into rows. Each column essentially represents a single fact in a category. The ultimate goal is to aggregate the data into a single total per category.

There are tens of millions of rows in this dataframe, so I need a way to do the transformation on the spark cluster without bringing back any data to the driver (Jupyter in this case).

Here is an extract of my dataframe for just a few stores: +-----------+----------------+-----------------+----------------+ | store_id |qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs| +-----------+----------------+-----------------+----------------+ | 100| 30| 105| 35| | 200| 55| 85| 65| | 300| 20| 125| 90| +-----------+----------------+-----------------+----------------+

Here is the desired resulting dataframe, multiple rows per store, where the columns of the original dataframe have been melted into rows of the new dataframe, with one row per original column in a new category column: +-----------+--------+-----------+ | product_id|CATEGORY|qty_on_hand| +-----------+--------+-----------+ | 100| milk| 30| | 100| bread| 105| | 100| eggs| 35| | 200| milk| 55| | 200| bread| 85| | 200| eggs| 65| | 300| milk| 20| | 300| bread| 125| | 300| eggs| 90| +-----------+--------+-----------+

Ultimately, I want to aggregate the resulting dataframe to get the totals per category: +--------+-----------------+ |CATEGORY|total_qty_on_hand| +--------+-----------------+ | milk| 105| | bread| 315| | eggs| 190| +--------+-----------------+

UPDATE: There is a suggestion that this question is a duplicate and can be answered here. This is not the case, as the solution casts rows to columns and I need to do the reverse, melt columns into rows.

3
Possible duplicate of How to melt Spark DataFrame?pault

3 Answers

7
votes

We can use explode() function to solve this issue. In Python, the same thing can be done with melt

# Loading the requisite packages 
from pyspark.sql.functions import col, explode, array, struct, expr, sum, lit        
# Creating the DataFrame
df = sqlContext.createDataFrame([(100,30,105,35),(200,55,85,65),(300,20,125,90)],('store_id','qty_on_hand_milk','qty_on_hand_bread','qty_on_hand_eggs'))
df.show()
+--------+----------------+-----------------+----------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
|     100|              30|              105|              35|
|     200|              55|               85|              65|
|     300|              20|              125|              90|
+--------+----------------+-----------------+----------------+

Writing the function below, which shall explode this DataFrame:

def to_explode(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("CATEGORY"), col(c).alias("qty_on_hand")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.CATEGORY", "kvs.qty_on_hand"])

Applying the function on this DataFrame to explode it-

df = to_explode(df, ['store_id'])\
     .drop('store_id')
df.show()
+-----------------+-----------+
|         CATEGORY|qty_on_hand|
+-----------------+-----------+
| qty_on_hand_milk|         30|
|qty_on_hand_bread|        105|
| qty_on_hand_eggs|         35|
| qty_on_hand_milk|         55|
|qty_on_hand_bread|         85|
| qty_on_hand_eggs|         65|
| qty_on_hand_milk|         20|
|qty_on_hand_bread|        125|
| qty_on_hand_eggs|         90|
+-----------------+-----------+

Now, we need to remove the string qty_on_hand_ from CATEGORY column. It can be done using expr() function. Note expr follows 1 based indexing for the substring, as opposed to 0 -

df = df.withColumn('CATEGORY',expr('substring(CATEGORY, 13)'))
df.show()
+--------+-----------+
|CATEGORY|qty_on_hand|
+--------+-----------+
|    milk|         30|
|   bread|        105|
|    eggs|         35|
|    milk|         55|
|   bread|         85|
|    eggs|         65|
|    milk|         20|
|   bread|        125|
|    eggs|         90|
+--------+-----------+

Finally, aggregating the column qty_on_hand grouped by CATEGORY using agg() function -

df = df.groupBy(['CATEGORY']).agg(sum('qty_on_hand').alias('total_qty_on_hand'))
df.show()
+--------+-----------------+
|CATEGORY|total_qty_on_hand|
+--------+-----------------+
|    eggs|              190|
|   bread|              315|
|    milk|              105|
+--------+-----------------+
3
votes

I think you should use array and explode to do this, you do not need any complex logic with UDFs or custom functions.

array will combine columns into a single column, or annotate columns.

explode will convert an array column into a set of rows.

All you need to do is:

  • annotate each column with you custom label (eg. 'milk')
  • combine your labelled columns into a single column of 'array' type
  • explode the labels column to generate labelled rows
  • drop irrelevant columns
df = (
    df.withColumn('labels', F.explode(                         # <-- Split into rows
        F.array(                                               # <-- Combine columns
            F.array(F.lit('milk'), F.col('qty_on_hand_milk')), # <-- Annotate column
            F.array(F.lit('bread'), F.col('qty_on_hand_bread')),
            F.array(F.lit('eggs'), F.col('qty_on_hand_eggs')),
        )
    ))
    .withColumn('CATEGORY', F.col('labels')[0])
    .withColumn('qty_on_hand', F.col('labels')[1])
).select('store_id', 'CATEGORY', 'qty_on_hand')

Note how you can pull out elements of an array column simply using col('foo')[INDEX]; there is no specific need to split them out into separate columns.

This approach is also robust over different data types, because it doesn't try to force the same schema over every row (unlike using a struct).

eg. If 'qty_on_hand_bread' is a string, this still works, the resulting schema will just be:

root
 |-- store_id: long (nullable = false)
 |-- CATEGORY: string (nullable = true)
 |-- qty_on_hand: string (nullable = true) <-- Picks best schema on the fly

Here is the same code, step by step to make it obvious what's happening here:

import databricks.koalas as ks
import pyspark.sql.functions as F

# You don't need koalas, it's just less verbose for adhoc dataframes
df = ks.DataFrame({
    "store_id": [100, 200, 300],
    "qty_on_hand_milk": [30, 55, 20],
    "qty_on_hand_bread": [105, 85, 125],
    "qty_on_hand_eggs": [35, 65, 90],
}).to_spark()
df.show()

# Annotate each column with your custom label per row. ie. v -> ['label', v]
df = df.withColumn('label1', F.array(F.lit('milk'), F.col('qty_on_hand_milk')))
df = df.withColumn('label2', F.array(F.lit('bread'), F.col('qty_on_hand_bread')))
df = df.withColumn('label3', F.array(F.lit('eggs'), F.col('qty_on_hand_eggs')))
df.show()

# Create a new column which combines the labeled values in a single column
df = df.withColumn('labels', F.array('label1', 'label2', 'label3'))
df.show()

# Split into individual rows
df = df.withColumn('labels', F.explode('labels'))
df.show()

# You can now do whatever you want with your labelled rows, eg. split them into new columns
df = df.withColumn('CATEGORY', F.col('labels')[0])
df = df.withColumn('qty_on_hand', F.col('labels')[1])
df.show()

...and the output from each step:

|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
|     100|              30|              105|              35|
|     200|              55|               85|              65|
|     300|              20|              125|              90|
+--------+----------------+-----------------+----------------+

+--------+----------------+-----------------+----------------+----------+------------+----------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|
+--------+----------------+-----------------+----------------+----------+------------+----------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|
+--------+----------------+-----------------+----------------+----------+------------+----------+

+--------+----------------+-----------------+----------------+----------+------------+----------+--------------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|              labels|
+--------+----------------+-----------------+----------------+----------+------------+----------+--------------------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|[[milk, 30], [bre...|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|[[milk, 55], [bre...|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|[[milk, 20], [bre...|
+--------+----------------+-----------------+----------------+----------+------------+----------+--------------------+

+--------+----------------+-----------------+----------------+----------+------------+----------+------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|      labels|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [milk, 30]|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|[bread, 105]|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [eggs, 35]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [milk, 55]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]| [bread, 85]|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [eggs, 65]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [milk, 20]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|[bread, 125]|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [eggs, 90]|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+

+--------+----------------+-----------------+----------------+----------+------------+----------+------------+--------+-----------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|    label1|      label2|    label3|      labels|CATEGORY|qty_on_hand|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+--------+-----------+
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [milk, 30]|    milk|         30|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|[bread, 105]|   bread|        105|
|     100|              30|              105|              35|[milk, 30]|[bread, 105]|[eggs, 35]|  [eggs, 35]|    eggs|         35|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [milk, 55]|    milk|         55|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]| [bread, 85]|   bread|         85|
|     200|              55|               85|              65|[milk, 55]| [bread, 85]|[eggs, 65]|  [eggs, 65]|    eggs|         65|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [milk, 20]|    milk|         20|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|[bread, 125]|   bread|        125|
|     300|              20|              125|              90|[milk, 20]|[bread, 125]|[eggs, 90]|  [eggs, 90]|    eggs|         90|
+--------+----------------+-----------------+----------------+----------+------------+----------+------------+--------+-----------+

+--------+--------+-----------+
|store_id|CATEGORY|qty_on_hand|
+--------+--------+-----------+
|     100|    milk|         30|
|     100|   bread|        105|
|     100|    eggs|         35|
|     200|    milk|         55|
|     200|   bread|         85|
|     200|    eggs|         65|
|     300|    milk|         20|
|     300|   bread|        125|
|     300|    eggs|         90|
+--------+--------+-----------+
0
votes

A possible way of doing this using - col,when, functions modules of pyspark

>>> from pyspark.sql import functions as F
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import StringType
>>> concat_udf = F.udf(lambda cols: "".join([str(x) if x is not None else "*" for x in cols]), StringType())

>>> rdd = sc.parallelize([[100,30,105,35],[200,55,85,65],[300,20,125,90]])
>>> df = rdd.toDF(['store_id','qty_on_hand_milk','qty_on_hand_bread','qty_on_hand_eggs'])

>>> df.show()
+--------+----------------+-----------------+----------------+
|store_id|qty_on_hand_milk|qty_on_hand_bread|qty_on_hand_eggs|
+--------+----------------+-----------------+----------------+
|     100|              30|              105|              35|
|     200|              55|               85|              65|
|     300|              20|              125|              90|
+--------+----------------+-----------------+----------------+

#adding one more column with arrayed values of all three columns
>>> df_1=df.withColumn("new_col", concat_udf(F.array("qty_on_hand_milk", "qty_on_hand_bread","qty_on_hand_eggs")))
#convert it into array<int> for carrying out agg operations
>>> df_2=df_1.withColumn("new_col_1",split(col("new_col"), ",\s*").cast("array<int>").alias("new_col_1"))
#posexplode gives you the position along with usual explode which helps in categorizing
>>> df_3=df_2.select("store_id",  posexplode("new_col_1").alias("col_1","qty"))
#if else conditioning for category column
>>> df_3.withColumn("category",F.when(col("col_1") == 0, "milk").when(col("col_1") == 1, "bread").otherwise("eggs")).select("store_id","category","qty").show()
+--------+--------+---+
|store_id|category|qty|
+--------+--------+---+
|     100|    milk| 30|
|     100|   bread|105|
|     100|    eggs| 35|
|     200|    milk| 55|
|     200|   bread| 85|
|     200|    eggs| 65|
|     300|    milk| 20|
|     300|   bread|125|
|     300|    eggs| 90|
+--------+--------+---+

#aggregating to find sum
>>> df_3.withColumn("category",F.when(col("col_1") == 0, "milk").when(col("col_1") == 1, "bread").otherwise("eggs")).select("category","qty").groupBy('category').sum().show()
+--------+--------+
|category|sum(qty)|
+--------+--------+
|    eggs|     190|
|   bread|     315|
|    milk|     105|
+--------+--------+
>>> df_3.printSchema()
root
 |-- store_id: long (nullable = true)
 |-- col_1: integer (nullable = false)
 |-- qty: integer (nullable = true)