1
votes

I have a Spark dataframe in Python and it is in a specific order where the rows can be sectioned into the right groups, according to a column "start_of_section" which has values 1 or 0. For each collection of rows that need to be grouped together, every column other than "value" and "start_of_section" is equal. I want to group each such collection into one row that has the same values for every other column and a column "list_values" which has an array of all the values in each row.

So some rows might look like:

Row(category=fruit, object=apple, value=60, start_of_section=1)
Row(category=fruit, object=apple, value=160, start_of_section=0)
Row(category=fruit, object=apple, value=30, start_of_section=0)

and in the new dataframe this would be

Row(category=fruit, object=apple, list_values=[60, 160, 30])

(Edit: note that the column "start_of_section" should not have been included in the final dataframe.)

The issue I've had in trying to research the answer is that I've only found ways of grouping by column value without regard for ordering, so that this would wrongly produce two rows, one grouping all rows with "start_of_section"=1 and one grouping all rows with "start_of_section"=0..

What code can achieve this?

2
how do you define the order ? you need an "order" columnSteven
Is it sufficient for it to use the order that I see when using "show"? Otherwise can I create a new column with an incrementing counter based on its current position in the dataframe?A.M.
Give us your order column. We need it to define when the section starts.Steven
Both Lamanus and I are giving the same answer. just be sure you have an "order" column and it should work. Otherwise, you're doomed.Steven

2 Answers

0
votes

Assuming your order column is order_col

df.show()
+--------+------+---------+----------------+-----+
|category|object|order_col|start_of_section|value|
+--------+------+---------+----------------+-----+
|   fruit| apple|        1|               1|   60|
|   fruit| apple|        2|               0|  160|
|   fruit| apple|        3|               0|   30|
|   fruit| apple|        4|               1|   50|
+--------+------+---------+----------------+-----+

you need to generate an id to group the lines in the same section together, then group by this id and the dimension you want. Here is how you do it.

from pyspark.sql import functions as F, Window as W

df.withColumn(
    "id",
    F.sum("start_of_section").over(
        W.partitionBy("category", "object").orderBy("order_col")
    ),
).groupBy("category", "object", "id").agg(F.collect_list("value").alias("values")).drop(
    "id"
).show()

+--------+------+-------------+
|category|object|       values|
+--------+------+-------------+
|   fruit| apple|[60, 160, 30]|
|   fruit| apple|         [50]|
+--------+------+-------------+

EDIT: If you do not have any order_col, it is an impossible task to do. See your lines in a dataframe as marble in a bag. They do not have any order. You can order them as you pull them out of the bag according to some criteria, but otherwise, you cannot assume any order. show is just you pulling 10 marbles (lines) out of the bag. The order may be the same each time you do it, but suddently change, and you have no controle on it

0
votes

Well, now I got it. You can do a group by with the column that summing the start_of_section.

In order to make sure about the result, you should include the ordering column.

from pyspark.sql.types import Row
from pyspark.sql.functions import *
from pyspark.sql import Window

data = [Row(category='fruit', object='apple', value=60, start_of_section=1),
    Row(category='fruit', object='apple', value=160, start_of_section=0),
    Row(category='fruit', object='apple', value=30, start_of_section=0),
    Row(category='fruit', object='apple', value=50, start_of_section=1),
    Row(category='fruit', object='apple', value=30, start_of_section=0),
    Row(category='fruit', object='apple', value=60, start_of_section=1),
    Row(category='fruit', object='apple', value=110, start_of_section=0)]

df = spark.createDataFrame(data)

w = Window.partitionBy('category', 'object').rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn('group', sum('start_of_section').over(w)) \
  .groupBy('category', 'object', 'group').agg(collect_list('value').alias('list_value')) \
  .drop('group').show()

+--------+------+-------------+
|category|object|   list_value|
+--------+------+-------------+
|   fruit| apple|[60, 160, 30]|
|   fruit| apple|     [50, 30]|
|   fruit| apple|    [60, 110]|
+--------+------+-------------+

FAILS: monotonically_increasing_id fails when you have many partitions.

df.repartition(7) \
  .withColumn('id', monotonically_increasing_id()) \
  .withColumn('group', sum('start_of_section').over(w)) \
  .groupBy('category', 'object', 'group').agg(collect_list('value').alias('list_value')) \
  .drop('group').show()

+--------+------+--------------------+
|category|object|          list_value|
+--------+------+--------------------+
|   fruit| apple|                [60]|
|   fruit| apple|[60, 160, 30, 30,...|
|   fruit| apple|                [50]|
+--------+------+--------------------+

This is totally not wanted.