2
votes

When I run a PySpark code created using Jupyter Notebook of the Web Interfaces of a Dataproc Cluster, I found the running code does not use all resources either from Master Node or Worker nodes. It uses only part of them. I found a solution to this issue in answer of a question here said "Changing Scheduler properties to FIFO".

I have two questions here: 1) How can I change the Scheduler properties?
2) Is there any other method to make PySpark uses all resources other than changing Scheduler properties?

Thanks in advance

1

1 Answers

2
votes

If you are just trying to acquire more resources, you do not want to change the Spark scheduler. Rather, you want to ensure that your data is split into enough partitions, that you have enough executors and that each executor has enough memory, etc. to make your job run well.

Some properties you may want to consider:

  • spark.executor.cores - Number of CPU threads per executor.
  • spark.executor.memory - The amount of memory to be allocated for each executor.
  • spark.dynamicAllocation.enabled=true - Enables dynamic allocation. This allows the number of Spark executors to scale with the demands of the job.
  • spark.default.parallelism - Configures default parallelism for jobs. Beyond storage partitioning scheme, this property is the most important one to set correctly for a given job.
  • spark.sql.shuffle.partitions - Similar to spark.default.parallelism but for Spark SQL aggregation operations.

Note that you most likely do not want to touch any of the above except for spark.default.parallelism and spark.sql.shuffle.partitions (unless you're setting explicit RDD partition counts in your code). The YARN and Spark on Dataproc are configured such that (if no other jobs are running) a given Spark job will occupy all worker cores and (most) worker memory. (Some memory is still reserved for system resources.)

If you have already set spark.default.parallelism sufficiently high and are still seeing low cluster utilization, then your job may not be large enough to require those resources or your input dataset is not sufficiently splittable.

Note that if you're using HDFS or GCS (Google Cloud Storage) for your data storage, the default block size is 64 MiB or 128 MiB respectively. Input data is not split beyond block size, so your initial parallelism (partition count) will be limited to data_size / block_size. It does not make sense to have more executor cores than partitions because those excess executors will have no work to do.