1
votes

EDIT [1]: Modified sample data to accurately show the situation.
EDIT [2]: Added (test) code to show how it should work.

I have a (PySpark) dataframe that looks like this:

Sample data (shown in 2 columns):

ID  Type Other      ID  Type Other
--------------      --------------
101 A    1          105 A     1
101 A    2          105 A     2
101 B    1          105 B     1
102 A    1          105 A     4
102 A    2          106 A     1
103 A    1          106 A     2
103 A    2          201 B     1
103 B    1          201 A     1
104 A    1          202 B     1
104 A    2          203 B     1
104 A    3          203 B     2

I am already partitioning by ID.

How do I partition this data so that the type is also spread equally across multiple partitions?

So, if I just partition by ID, and I'm making 3 partitions, I could get this:

Partitions:

1        2        3
---      ---      ---
102 A *  101 A *  201 B *
104 A *  103 A *  202 B *
106 A *  105 A *  203 B *
         101 B *  201 A *
         103 B *
         105 B *

However, dealing with B is more computationally expensive than dealing with A, so I get a bottleneck on partition 3 that only has Type B.

Ideally, I'd like to partition the data like this:

1        2        3
---      ---      ---
101 A *  103 A *  105 A *
102 A *  104 A *  106 A *
201 B *  202 B *  203 B *
101 B *  103 B *  105 B *
201 A *

How do I do that?

The following code is an example of what I want to do:

import random

import pyspark.sql.functions as f
from pyspark import Row
from pyspark.sql import DataFrame

from test_common.test_base import PySparkTest

RANDOM = random.Random()


def spread_values_OVER_partitions(df_input: DataFrame, concentrate_col_name: str, spread_col_name: str) -> DataFrame:
    """This method SHOULD partition a dataframe so that the first column is partitioned normally, but the "spread_col_name" column is spread over all partitions.

    Args:
        df_input:               The dataframe to partitions
        concentrate_col_name:   The column (name) on which you should (normally) partition.
        spread_col_name:        The column (name) over which values should be SPREAD over the partitions.

    Returns:
        The repartitioned dataframe.
    """

    # THIS DOES NOT WORK!
    return df_input.repartition(3, concentrate_col_name, spread_col_name)


class PartitionSpreadTest(PySparkTest):

    def test_spread_partitioning(self):
        """Test how to spread a certain columns values *OVER* partitions, instead of concentrating them."""

        test_data_tuple = [(id, 'A', other) for id in range(101, 106) for other in range(1, RANDOM.randint(3, 4))]
        test_data_tuple.extend([(id, 'B', other) for id in [104] + list(range(201, 204)) for other in range(1, RANDOM.randint(4, 5))])

        test_data_dict = [{'id':    r[0],
                           'type':  r[1],
                           'other': r[2],
                           }
                          for r in test_data_tuple]

        df_test = self.spark.createDataFrame(Row(**x) for x in test_data_dict)
        num_part = 3

        df_test.groupby('id', 'type').agg(f.count('id')).orderBy('id', 'type').show(100, False)

        # This DOES NOT WORK!
        df_repartitioned = spread_values_OVER_partitions(df_test, concentrate_col_name='id', spread_col_name='type')

        partition_cols = ['id', 'type']
        print(f"Num partitions: [{num_part:3}]: \n")
        # print partitions
        (df_repartitioned.select(
            *partition_cols,
            f.spark_partition_id().alias('part_id'))
         .distinct()
         .groupBy(*partition_cols)
         .agg(f.collect_list('part_id').alias('part_ids'))
         .withColumn('num_parts', f.size('part_ids'))
         .orderBy('part_ids', *partition_cols).select(
            *partition_cols,
            'part_ids',
            'num_parts')
         .show(1000, False))

However, the above code outputs this:

+---+----+--------+---------+
|id |type|part_ids|num_parts|
+---+----+--------+---------+
|101|A   |[0]     |1        |
|104|A   |[0]     |1        |
|105|A   |[0]     |1        |
|202|B   |[0]     |1        |
|203|B   |[0]     |1        |
|104|B   |[1]     |1        |
|201|B   |[1]     |1        |
|102|A   |[2]     |1        |
|103|A   |[2]     |1        |
+---+----+--------+---------+

In this case,

  • partition [1] only contains type B
  • partition [2] only contains type A

which is the opposite of what I want.

2
I'm starting to wonder if I need a window function on ('type', 'id') that assigns a different partition id ("row_number) per combination. 🤦 I was hoping that there was an in-built or more elegant solution than that. - Marco

2 Answers

1
votes

If you need full flexibility, you could also

  • convert Dataframe to RDD
  • apply custom partitioner as shown below
  • return back to Dataframe if necessary

Partitioner

Partitioner class is used to partition data based on keys. It accepts two parameters numPartitions and partitionFunc to initiate as the following code shows:

def __init__(self, numPartitions, partitionFunc):

The first parameter defines the number of partitions while the second parameter defines the partition function.

Source

Here is a pseudo code to show the basic idea:

Pseudo code

# obviously this will put all your values in only partition 0 so thi function should get more complex
def myPartitionerFunc(key): 
    return 0 

# when defining the key-value paired RDD you could e.g. concatenate 'ID' and 'Type' value
dfToRDD = df.rdd.map(lambda x: (x[0],x))
rdd = dfToRDD.partitionBy(3, myPartitionerFunc) 
dfPartitioned = spark.createDataFrame(rdd)
-1
votes

You can repartition by any number of columns. In your case you can do:

df.repartition("ID", "Type")

to repartition by (hash of) ID and Type. Docs here.

However note that if Type is dependent on ID (as your example suggests) it will not change much. This:

1        2        3
---      ---      ---
101 A *  104 A *  201 B *
102 A *  105 A *  202 B *
103 A *  106 A *  203 B *

is not really likely unless you repartition by range using ID alone. If you use standard hash partitioning, IDs (and therefore types) should be scattered randomly across partitions regardless of the keys you choose.