Given a Spark DataFrame which looks something like this:
==================================
| Name | Col1 | Col2 | .. | ColN |
----------------------------------
| A | 1 | 11 | .. | 21 |
| A | 31 | 41 | .. | 51 |
| B | 2 | 12 | .. | 22 |
| B | 32 | 42 | .. | 52 |
==================================
I would like to run logic which carries out an aggregation/computation for a partition of the table which corresponds to a particular Name
value. Said logic requires that the full contents of the partition -- and only that partition -- be materialized in memory on the node executing the logic; it would look something like the processSegment
function below:
def processDataMatrix(dataMatrix):
# do some number crunching on a 2-D matrix
def processSegment(dataIter):
# "running" value of the Name column in the iterator
dataName = None
# as the iterator is processed, put the data in a matrix
dataMatrix = []
for dataTuple in dataIter:
# separate the name column from the other columns
(name, *values) = dataTuple
# SANITY CHECK: ensure that all rows have same name
if (dataName is None):
dataName = name
else:
assert (dataName == name), 'row name ' + str(name) + ' does not match expected ' + str(dataName)
# put the row in the matrix
dataMatrix.append(values)
# if any rows were processed, number-crunch the matrix
if (dataName is not None):
return processDataMatrix(dataMatrix)
else:
return []
I have tried to make this work by repartitioning based on the Name
column, then running processSegment
on each partition via mapPartitions
on the underlying RDD:
result = \
stacksDF \
.repartition('Name') \
.rdd \
.mapPartitions(processSegment) \
.collect()
However, the process routinely fails the SANITY CHECK
assertion in processSegment
:
AssertionError: row name Q7 does not match expected A9
Why is the partitioning ostensibly executed on the DataFrame not being preserved when I attempt to run mapPartitions
on the underlying RDD? If the approach above is not valid, is there some approach (using either the DataFrame API or the RDD API) which will enable me to carry out aggregation logic on the in-memory rendition of a DataFrame partition?
(As I am using PySpark, and the particular number-crunching logic I wish to execute is Python, user-defined aggregation functions (UDAFs) would not appear to be an option.)