23
votes

I'm running a job on Apache Spark on Amazon Elastic Map Reduce (EMR). Currently I'm running on emr-4.1.0 which includes Amazon Hadoop 2.6.0 and Spark 1.5.0.

When I start the job, YARN correctly has allocated all the worker nodes to the spark job (with one for the driver, of course).

I have the magic "maximizeResourceAllocation" property set to "true", and the spark property "spark.dynamicAllocation.enabled" also set to "true".

However, if I resize the emr cluster by adding nodes to the CORE pool of worker machines, YARN only adds some of the new nodes to the spark job.

For example, this morning I had a job that was using 26 nodes (m3.2xlarge, if that matters) - 1 for the driver, 25 executors. I wanted to speed up the job so I tried adding 8 more nodes. YARN has picked up all of the new nodes, but only allocated 1 of them to the Spark job. Spark did successfully pick up the new node and is using it as an executor, but my question is why is YARN letting the other 7 nodes just sit idle?

It's annoying for obvious reasons - I have to pay for the resources even though they're not being used, and my job hasn't sped up at all!

Anybody know how YARN decides when to add nodes to running spark jobs? What variables come into play? Memory? V-Cores? Anything?

Thanks in advance!

2
Yes, welcome to the annoying world of YARN! Have you set yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DominantResourceCalculator in the capacity-scheduler.xml?Glennie Helles Sindholt
I have not! I can give that a try (probably not until next week) but I am starting to suspect that Spark itself won't request more nodes than there are at the time it is started - but I could be wrong!retnuH
Good luck :) Personally, I think that YARN - not Spark - is the problem. I have never had any problems with resources not being utilized when I ran Spark in Standalone mode (before EMR 4.x). However, since upgrading to EMR 4.x (and hence YARN) I have had a million problems - including underutilization of cores...Glennie Helles Sindholt

2 Answers

25
votes

Okay, with the help of @sean_r_owen, I was able to track this down.

The problem was this: when setting spark.dynamicAllocation.enabled to true, spark.executor.instances shouldn't be set - an explicit value for that will override dynamic allocation and turn it off. It turns out that EMR sets it in the background if you do not set it yourself. To get the desired behaviour, you need to explicitly set spark.executor.instances to 0.

For the records, here is the contents of one of the files we pass to the --configurations flag when creating an EMR cluster:

[
    {
        "Classification": "capacity-scheduler",
        "Properties": {
            "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
    },

    {
        "Classification": "spark",
        "Properties": {
            "maximizeResourceAllocation": "true"
        }
    },

    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.dynamicAllocation.enabled": "true",
            "spark.executor.instances": "0"
        }
    } 
]

This gives us an EMR cluster where Spark uses all the nodes, including added nodes, when running jobs. It also appears to use all/most of the memory and all (?) the cores.

(I'm not entirely sure that it's using all the actual cores; but it is definitely using more than 1 VCore, which it wasn't before, but following Glennie Helles's advice it is now behaving better and using half of the listed VCores, which seems to equal the actual number of cores...)

2
votes

I observed the same behavior with nearly the same settings using emr-5.20.0. I didn't try to add nodes when the cluster is already running but using TASK nodes (together with just one CORE node). I'm using InstanceFleets to define MASTER, CORE and TASK nodes (with InstanceFleets I don't know which exact InstanceTypes I get and that is why I don't want to define the number of executors, cores and memory per executor myself but want that to be maximized/optimized automatically).

With this, it only uses two TASK nodes (probably the first two nodes which are ready to use?) but never scales up while more TASK nodes get provisioned and finishing the bootstrap phase.

What made it work in my case was to set the spark.default.parallelism parameter (to the number of total number of cores of my TASK nodes), which is the same number used for the TargetOnDemandCapacity or TargetSpotCapacity of the TASK InstanceFleet:

[
    {
        "Classification": "capacity-scheduler",
        "Properties": {
            "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
    },
    {
        "Classification": "spark",
        "Properties": {
            "maximizeResourceAllocation": "true"
        }
    },
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.dynamicAllocation.enabled": "true",
            "spark.default.parallelism", <Sum_of_Cores_of_all_TASK_nodes>
        }
    } 
]

For the sake of completeness: I'm using one CORE node and several TASK nodes mainly to make sure the cluster has at least 3 nodes (1 MASTER, 1 CORE and at least one TASK node). Before I tried to used only CORE nodes, but as in my case the number of cores is calculated depending on the actual task it was possible to end up with a cluster consisting of just one MASTER and one CORE node. Using the maximizeResourceAllocation option such a cluster runs for ever doing nothing because the executor running the yarn application master is occupying that single CORE node completely.