When doing input-process-output type of Spark jobs, there are three separate issues to consider:
- Input parallelism
- Processing parallelism
- Output parallelism
In your case, the input parallelism is 1 because in your question you claim that you cannot change the input format or granularity.
You are also doing essentially no processing so you can't get any gains there.
However, you can control the output parallelism, which will give you two benefits:
Multiple CPUs will write, thus decreasing the total time of the write operation.
Your output will be split in multiple files allowing you to take advantage of input parallelism in later processing.
To increase parallelism, you have to increase the number of partitions, which you can do with repartition()
, e.g.,
val numPartitions = ...
input.repartition(numPartitions).write.parquet("s3n://temp-output/")
When choosing the optimal number of partitions, there are a number of different factors to consider.
- Data size
- Parition skew
- Cluster RAM size
- Number of cores in the cluster
- The type of follow-on processing you'll do
- The size of cluster (RAM & cores) you'll use for follow-on processing
- The system you are writing to
Without knowing your goals and constraints it is difficult to make a solid recommendation but here are a couple general guidelines to work with:
Since your partitions won't be skewed (the above use of repartition
will use a hash partitioner that corrects for skew), you will get the fastest throughput if you set the number of partitions equal to the number of executor cores, assuming that you are using nodes with sufficient I/O.
When you process data, you really want an entire partition to be able to "fit" in the RAM allocated to a single executor core. What "fit" means here depends on your processing. If you are doing a simple map
transformation, the data may be streamed. If you are doing something involving ordering then the RAM needs grow substantially. If you are using Spark 1.6+, you'll get the benefit of more flexible memory management. If you are using an earlier version, you'll have to be more careful. Job execution grinds to a halt when Spark has to start "buffering" to disk. On-disk size and in-RAM size can be very, very different. The latter varies based on how you process the data and how much benefit Spark can get from predicate pushdown (Parquet supports that). Use the Spark UI to see how much RAM various job stages take.
BTW, unless your data has a very specific structure, do not hard-code partition numbers because then your code will run sub-optimally on clusters of varying sizes. Instead, use the following trick to determine the number of executors in a cluster. You can then multiply by the number of cores per executor based on the machines you are using.
// -1 is for the driver node
val numExecutors = sparkContext.getExecutorStorageStatus.length - 1
Just as a point of reference, on our team, where we use rather complex data structures, which means that RAM size >> disk size, we aim to keep S3 objects in the 50-250Mb range for processing on nodes where each executor core has 10-20Gb RAM.
Hope this helps.