EDIT [1]: Modified sample data to accurately show the situation.
EDIT [2]: Added (test) code to show how it should work.
I have a (PySpark) dataframe that looks like this:
Sample data (shown in 2 columns):
ID Type Other ID Type Other
-------------- --------------
101 A 1 105 A 1
101 A 2 105 A 2
101 B 1 105 B 1
102 A 1 105 A 4
102 A 2 106 A 1
103 A 1 106 A 2
103 A 2 201 B 1
103 B 1 201 A 1
104 A 1 202 B 1
104 A 2 203 B 1
104 A 3 203 B 2
I am already partitioning by ID.
How do I partition this data so that the type is also spread equally across multiple partitions?
So, if I just partition by ID, and I'm making 3 partitions, I could get this:
Partitions:
1 2 3
--- --- ---
102 A * 101 A * 201 B *
104 A * 103 A * 202 B *
106 A * 105 A * 203 B *
101 B * 201 A *
103 B *
105 B *
However, dealing with B is more computationally expensive than dealing with A, so I get a bottleneck on partition 3 that only has Type B.
Ideally, I'd like to partition the data like this:
1 2 3
--- --- ---
101 A * 103 A * 105 A *
102 A * 104 A * 106 A *
201 B * 202 B * 203 B *
101 B * 103 B * 105 B *
201 A *
How do I do that?
The following code is an example of what I want to do:
import random
import pyspark.sql.functions as f
from pyspark import Row
from pyspark.sql import DataFrame
from test_common.test_base import PySparkTest
RANDOM = random.Random()
def spread_values_OVER_partitions(df_input: DataFrame, concentrate_col_name: str, spread_col_name: str) -> DataFrame:
"""This method SHOULD partition a dataframe so that the first column is partitioned normally, but the "spread_col_name" column is spread over all partitions.
Args:
df_input: The dataframe to partitions
concentrate_col_name: The column (name) on which you should (normally) partition.
spread_col_name: The column (name) over which values should be SPREAD over the partitions.
Returns:
The repartitioned dataframe.
"""
# THIS DOES NOT WORK!
return df_input.repartition(3, concentrate_col_name, spread_col_name)
class PartitionSpreadTest(PySparkTest):
def test_spread_partitioning(self):
"""Test how to spread a certain columns values *OVER* partitions, instead of concentrating them."""
test_data_tuple = [(id, 'A', other) for id in range(101, 106) for other in range(1, RANDOM.randint(3, 4))]
test_data_tuple.extend([(id, 'B', other) for id in [104] + list(range(201, 204)) for other in range(1, RANDOM.randint(4, 5))])
test_data_dict = [{'id': r[0],
'type': r[1],
'other': r[2],
}
for r in test_data_tuple]
df_test = self.spark.createDataFrame(Row(**x) for x in test_data_dict)
num_part = 3
df_test.groupby('id', 'type').agg(f.count('id')).orderBy('id', 'type').show(100, False)
# This DOES NOT WORK!
df_repartitioned = spread_values_OVER_partitions(df_test, concentrate_col_name='id', spread_col_name='type')
partition_cols = ['id', 'type']
print(f"Num partitions: [{num_part:3}]: \n")
# print partitions
(df_repartitioned.select(
*partition_cols,
f.spark_partition_id().alias('part_id'))
.distinct()
.groupBy(*partition_cols)
.agg(f.collect_list('part_id').alias('part_ids'))
.withColumn('num_parts', f.size('part_ids'))
.orderBy('part_ids', *partition_cols).select(
*partition_cols,
'part_ids',
'num_parts')
.show(1000, False))
However, the above code outputs this:
+---+----+--------+---------+
|id |type|part_ids|num_parts|
+---+----+--------+---------+
|101|A |[0] |1 |
|104|A |[0] |1 |
|105|A |[0] |1 |
|202|B |[0] |1 |
|203|B |[0] |1 |
|104|B |[1] |1 |
|201|B |[1] |1 |
|102|A |[2] |1 |
|103|A |[2] |1 |
+---+----+--------+---------+
In this case,
- partition
[1]only contains type B - partition
[2]only contains type A
which is the opposite of what I want.