1
votes

Given a Spark DataFrame which looks something like this:

==================================
| Name | Col1 | Col2 | .. | ColN |
----------------------------------
|    A |    1 |   11 | .. |   21 |
|    A |   31 |   41 | .. |   51 |
|    B |    2 |   12 | .. |   22 |
|    B |   32 |   42 | .. |   52 |
==================================

I would like to run logic which carries out an aggregation/computation for a partition of the table which corresponds to a particular Name value. Said logic requires that the full contents of the partition -- and only that partition -- be materialized in memory on the node executing the logic; it would look something like the processSegment function below:

def processDataMatrix(dataMatrix):
    # do some number crunching on a 2-D matrix

def processSegment(dataIter):
    # "running" value of the Name column in the iterator
    dataName = None
    # as the iterator is processed, put the data in a matrix
    dataMatrix = []

    for dataTuple in dataIter:
        # separate the name column from the other columns
        (name, *values) = dataTuple
        # SANITY CHECK: ensure that all rows have same name
        if (dataName is None):
            dataName = name
        else:
            assert (dataName == name), 'row name ' + str(name) + ' does not match expected ' + str(dataName)

        # put the row in the matrix
        dataMatrix.append(values)

    # if any rows were processed, number-crunch the matrix
    if (dataName is not None):
        return processDataMatrix(dataMatrix)
    else:
        return []

I have tried to make this work by repartitioning based on the Name column, then running processSegment on each partition via mapPartitions on the underlying RDD:

result = \
    stacksDF \
        .repartition('Name') \
        .rdd \
        .mapPartitions(processSegment) \
        .collect()

However, the process routinely fails the SANITY CHECK assertion in processSegment:

AssertionError: row name Q7 does not match expected A9

Why is the partitioning ostensibly executed on the DataFrame not being preserved when I attempt to run mapPartitions on the underlying RDD? If the approach above is not valid, is there some approach (using either the DataFrame API or the RDD API) which will enable me to carry out aggregation logic on the in-memory rendition of a DataFrame partition?

(As I am using PySpark, and the particular number-crunching logic I wish to execute is Python, user-defined aggregation functions (UDAFs) would not appear to be an option.)

1

1 Answers

1
votes

I believe that you misunderstood how partitioning works. In general partioner is a surjective function, not a bijective one. While all records for a specific value will be moved to a single partition, partition may contain records with multiple different values.

DataFrame API doesn't give you any control over partitioner, but it is possible to define custom partitionFunc when using RDD API. It means you can use one which is bijective, for example:

mapping = (df
    .select("Name")
    .distinct()
    .rdd.flatMap(lambda x: x)
    .zipWithIndex()
    .collectAsMap())

def partitioner(x):
    return mapping[x]

and use it as follows:

df.rdd.map(lambda row: (row.Name, row)).partitionBy(len(mapping), partitioner)

Although it is possible you have to remember that partitions are not free and if number of unique values is large it can become a serious performance issue.