21
votes

I'm trying to maximize cluster usage for a simple task.

Cluster is 1+2 x m3.xlarge, runnning Spark 1.3.1, Hadoop 2.4, Amazon AMI 3.7

The task reads all lines of a text file and parse them as csv.

When I spark-submit a task as a yarn-cluster mode, I get one of the following result:

  • 0 executor: job waits infinitely until I manually kill it
  • 1 executor: job under utilize resources with only 1 machine working
  • OOM when I do not assign enough memory on the driver

What I would have expected:

  • Spark driver run on cluster master with all memory available, plus 2 executors with 9404MB each (as defined by install-spark script).

Sometimes, when I get a "successful" execution with 1 executor, cloning and restarting the step ends up with 0 executor.

I created my cluster using this command:

aws emr --region us-east-1 create-cluster --name "Spark Test"
--ec2-attributes KeyName=mykey 
--ami-version 3.7.0 
--use-default-roles 
--instance-type m3.xlarge 
--instance-count 3 
--log-uri s3://mybucket/logs/ 
--bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=["-x"] 
--steps Name=Sample,Jar=s3://elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--master,yarn,--deploy-mode,cluster,--class,my.sample.spark.Sample,s3://mybucket/test/sample_2.10-1.0.0-SNAPSHOT-shaded.jar,s3://mybucket/data/],ActionOnFailure=CONTINUE

With some step variations including:

--driver-memory 8G --driver-cores 4 --num-executors 2


install-spark script with -x produces the following spark-defaults.conf:

$ cat spark-defaults.conf
spark.eventLog.enabled  false
spark.executor.extraJavaOptions         -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
spark.driver.extraJavaOptions         -Dspark.driver.log.level=INFO
spark.executor.instances        2
spark.executor.cores    4
spark.executor.memory   9404M
spark.default.parallelism       8

Update 1

I get the same behavior with a generic JavaWordCount example:

/home/hadoop/spark/bin/spark-submit --verbose --master yarn --deploy-mode cluster --driver-memory 8G --class org.apache.spark.examples.JavaWordCount /home/hadoop/spark/lib/spark-examples-1.3.1-hadoop2.4.0.jar s3://mybucket/data/

However, if I remove the '--driver-memory 8G', the task gets assigned 2 executors and finishes correctly.

So, what's the matter with driver-memory preventing my task to get executors?

Should the driver be executed on the cluster's master node alongside with Yarn master container as explained here?

How do I give more memory to my spark job driver? (Where collects and some other useful operations arise)

4

4 Answers

21
votes

The solution to maximize cluster usage is to forget about the '-x' parameter when installing spark on EMR and to adjust executors memory and cores by hand.

This post gives a pretty good explanation of how resources allocation is done when running Spark on YARN.

One important thing to remember is that all executors must have the same resources allocated! As we speak, Spark does not support heterogeneous executors. (Some work is currently being made to support GPUs but it's another topic)

So in order to get maximum memory allocated to the driver while maximizing memory to the executors, I should split my nodes like this (this slideshare gives good screenshots at page 25):

  • Node 0 - Master (Yarn resource manager)
  • Node 1 - NodeManager(Container(Driver) + Container(Executor))
  • Node 2 - NodeManager(Container(Executor) + Container(Executor))

NOTE: Another option would be to spark-submit with --master yarn --deploy-mode client from the master node 0. Are there any counter example this is a bad idea?

In my example, I can have at most have 3 executors of 2 vcores with 4736 MB each + a driver with same specs.

4736 memory is derived from the value of yarn.nodemanager.resource.memory-mb defined in /home/hadoop/conf/yarn-site.xml. On a m3.xlarge, it is set to 11520 mb (see here for all values associated to each instance types)

Then, we get:

(11520 - 1024) / 2 (executors per nodes) = 5248 => 5120 (rounded down to 256 mb increment as defined in yarn.scheduler.minimum-allocation-mb)

7% * 5120 = 367 rounded up to 384 (memory overhead) will become 10% in spark 1.4

5120 - 384 = 4736

Other interesting links:

2
votes

The issue is around the expectations for how Spark works on YARN. When Spark is run with a deployment mode of cluster or master set to yarn-cluster the driver is not executed on the master node but in the Application Master container on one of the slave nodes. For more details see https://spark.apache.org/docs/latest/running-on-yarn.html

I expect what is happening is that the cluster cannot fulfill the memory requirements for the driver (remember that memory actually requested of the cluster is what you ask for plus an overhead) and thus waiting forever to allocate the Application Master where driver will run or for executors.

To give the driver the amount of memory you are requesting you would need to use additional slaves in order to provide resources for the cluster-based driver and executors at the same time. With the overhead on the driver, I suspect you may need to utilize an instance type with more memory. When you request 8G for the driver take a look at the resource manager log and verify the real amount requested.

To run the driver on the master node the deployment mode would need to be client. This can still be done with EMR steps if you utilize one step to call a script to localize the driver jars onto the master node and then the next step can call spark-submit set for deployment mode client and referencing the JAR on the local master file system.

1
votes

Michel Lemay's post is good background reading, and he gives an answer for 1 particular cluster configuration. I've embedded that logic into a spreadsheet that will show the best options for any cluster. To use, fill out the number of nodes in the cluster, the number of virtual cores/node, and the amount of allocatable memory/node. After doing this, the sheet will give you options for launch commands that will fully utilize your cluster for both client & cluster mode for 1, 2, 4, and 8 executors per node. I've highlighted the row corresponding to 2 executors per node as this has consistently been the best option in my tests. Feel free to copy this sheet or add tabs for different cluster types as you like.

https://docs.google.com/spreadsheets/d/1VH7Qly308hoRPu5VoLIg0ceolrzen-nBktRFkXHRrY4/edit?usp=sharing

1
votes

Here is how I get around the problem:

By setting spark.executor.memory + driver-memory below the total of any given MASTER node, then YARN is able to place both the Master and the executor on a given node.. You sacrifice some lost memory on the other nodes, but its more important that I have the CPUs running. Here is an example (on r3.8xlarge):

aws emr add-steps --cluster-id j-1234 --steps Type=Spark,Name=foob3,Args=[--conf,spark.memory.fraction=0.95,--conf,spark.memory.storageFraction=0.1,--conf,spark.yarn.executor.memoryOverhead=8000,--conf,spark.executor.memory=200g,--conf,spark.executor.cores=32,--conf,spark.executor.instances=4,--conf,spark.dynamicAllocation.enabled=false,--class,myclass.Foo,--deploy-mode,cluster,--master,yarn,--driver-memory,10g,s3://myjar-1.0-SNAPSHOT.jar],ActionOnFailure=CONTINUE