1
votes

I have been benchmarking a pySpark job running on a Dataproc cluster and noticed a stubborn 'floor' to the processing time - despite several different cluster configs. I'm wondering if this is due to file xfer latency between gs: storage and Dataproc?

The source file is 60G and stored in a bucket, under the same project, in the same region (us-central1) as my dataproc cluster. The file is 1.43 billion lines, with 731 million records of 17 fields each, all on one line. The extra lines are all blanks except for the header row.

In a 1 master, 4 worker config with everything a n-standard-8 machine with a 300GB disk, the run time was 35:20 and 36:33. When I beefed up the cluster to 8 workers instead of 4 (still all n-standard-8) it dropped to to 21:14. Next I changed the wkrs to 4 of n-highmem-32s, while keeping the mstr at n-standard-8, which clocked in at 20:01 Finally, I really beefed up, by switching to 1 mstr and 16 wkrs, all n-highmem-32. This run came in with the best time at 15:36

Here are the results of all my tests/configs, etc:

Testing results

I ran other tests with slight changes to caching in the script, but none of them were better than the above.

This makes me think the initial xfer of that 60G file is a major factor. How long would you expect such a xfer to take - given it's all within the GCP, under the same project, in the same region? Should it take 10 minutes?

I'm also including the pySpark script in case the answer is in here:

enter image description here

1
"The file is 1.43 billion lines, with 731 million records of 17 fields each, all on one line." Can you re-explain this? How long are the lines approximately? This might be your problem. - surj
Also you can likely use reduceByKey instead of groupByKey. - surj

1 Answers

0
votes

In general, there shouldn't be a bottleneck in terms of file size as long as partitions are getting computed reasonably fine-grained; the underlying InputFormat is expected to chop up the 60GB file into a large number of smaller byte ranges ~64MB in size which will each be read independently by different worker tasks. You should check the Spark web interfaces to visualize the number of tasks being run, their timing, min/max time per task, amount of bytes processed by each worker, etc., to see if there's some skew in the data splitting or something causing some workers to be idle and not contributing to the reading of the file.

If the number of partitions is small for some reason, you can try adjusting the minPartitions argument in the call to SparkContext.textFile(path, minPartitions).