1
votes

My problem is that my pyspark job is not running in parallel.

Code and data format:
My PySpark looks something like this (simplified, obviously):

class TheThing:
    def __init__(self, dInputData, lDataInstance):
        # ...
    def does_the_thing(self):
        """About 0.01 seconds calculation time per row"""
        # ...
        return lProcessedData

#contains input data pre-processed from other RDDs
#done like this because one RDD cannot work with others inside its transformation
#is about 20-40MB in size
#everything in here loads and processes from BigQuery in about 7 minutes
dInputData = {'dPreloadedData': dPreloadedData}

#rddData contains about 3M rows
#is about 200MB large in csv format
#rddCalculated is about the same size as rddData
rddCalculated = (
    rddData
        .map(
            lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
        )
)

llCalculated = rddCalculated.collect()
#save as csv, export to storage

Running on Dataproc cluster:
Cluster is created via the Dataproc UI.
Job is executed like this:
gcloud --project <project> dataproc jobs submit pyspark --cluster <cluster_name> <script.py>

I observed the job status via the UI, started like this. Browsing through it I noticed that only one (seemingly random) of my worker nodes was doing anything. All others were completely idle.

Whole point of PySpark is to run this thing in parallel, and is obviously not the case. I've run this data in all sorts of cluster configurations, the last one being massive, which is when I noticed it's singular-node use. And hence why my jobs take too very long to complete, and time seems independent of cluster size.

All tests with smaller datasets pass without problems on my local machine and on the cluster. I really just need to upscale.

EDIT
I changed
llCalculated = rddCalculated.collect()
#... save to csv and export
to
rddCalculated.saveAsTextFile("gs://storage-bucket/results")

and only one node is still doing the work.

1

1 Answers

2
votes

Depending on whether you loaded rddData from GCS or HDFS, the default split size is likely either 64MB or 128MB, meaning your 200MB dataset only has 2-4 partitions. Spark does this because typical basic data-parallel tasks churn through data fast enough that 64MB-128MB means maybe tens of seconds of processing, so there's no benefit in splitting into smaller chunks of parallelism since startup overhead would then dominate.

In your case, it sounds like the per-MB processing time is much higher due to your joining against the other dataset and perhaps performing fairly heavyweight computation on each record. So you'll want a larger number of partitions, otherwise no matter how many nodes you have, Spark won't know to split into more than 2-4 units of work (which would also likely get packed onto a single machine if each machine has multiple cores).

So you simply need to call repartition:

rddCalculated = (
    rddData
        .repartition(200)
        .map(
            lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
        )
)

Or add the repartition to an earlier line:

rddData = rddData.repartition(200)

Or you may have better efficiency if you repartition at read time:

rddData = sc.textFile("gs://storage-bucket/your-input-data", minPartitions=200)