First point is that if there are enough resources such as nodes, CPUs and memory available to yarn it can use dynamic allocation to create spark workers with appropriate default cores and memory allocated.
In my case I needed to turn off dynamic allocation as my resource levels were very low.
So from pyspark I set the following values :
conf = (SparkConf().setAppName("simple")
.set("spark.shuffle.service.enabled", "false")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.cores.max", "1")
.set("spark.executor.instances","2")
.set("spark.executor.memory","200m")
.set("spark.executor.cores","1")
Note: basically the values set here should be less than the actual resources available. However too small values here can lead to out of memory issues, or slow performance issues when your job runs.
The complete code gist of a sample job is available here
Another important point to note for this pyspark case is that Spark on Yarn can run on two modes
- cluster mode - the spark driver is run in the spark master node
- client mode - the spark driver is run from the client side where the interactive shell is run.
Cluster mode is not well suited to using Spark interactively. Spark applications that require user input, such as spark-shell and pyspark, require the Spark driver to run inside the client process that initiates the Spark application.
Client mode can be set in environment as below
export PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client pyspark-shell'