1
votes

I have a Dataproc cluster:

master - 6cores| 32g

worker{0-7} - 6cores| 32g

Maximum allocation: memory:24576, vCores:6

Have two spark-streaming jobs to submit, one after another

In the first place, I tried to submit with default configurations spark.dynamicAllocation.enabled=true

In 30% of cases, I saw that the first job caught almost all available memory and the second was queued and waited for resources for ages. (This is a streaming job which took a small portion of resources every batch ).

My second try was to change a dynamic allocation. I submitted the same two jobs with identical configurations:

spark.dynamicAllocation.enabled=false
spark.executor.memory=12g
spark.executor.cores=3
spark.executor.instances=6
spark.driver.memory=8g

Surprisingly in Yarn UI I saw:

7 Running Containers with 84g Memory allocation for the first job.

3 Running Containers with 36g Memory allocation and 72g Reserved Memory for the second job

In Spark UI there are 6 executors and driver for the first job and 2 executors and driver for the second job

After retrying(deleting previous jobs and submitting the same jobs) without dynamic allocation and same configurations, I got a totally different result:

5 containers 59g Memory allocation for both jobs and 71g Reserved Memory for the second job. In spark UI I see 4 executors and driver in both cases.

I have a couple of questions:

  1. If dynamicAllocation=false, why the number of yarn containers is different from the number of executors? (Firstly I thought that additional yarn container is a driver, but it differs in memory.)
  2. If dynamicAllocation=false, Why Yarn doesn't create containers by my exact requirements- 6 containers(spark executors) for both jobs. Why two different attempts with the same configuration lead to different results?
  3. If dynamicAllocation=true - how may it be possible that low consuming memory spark job takes control of all Yarn resources

Thanks

2

2 Answers

4
votes

Spark and YARN scheduling are pretty confusing. I'm going to answer the questions in reverse order:

3) You should not be using dynamic allocation in Spark streaming jobs.

The issue is that Spark continuously asks YARN for more executors as long as there's a backlog of tasks to run. Once a Spark job gets an executor, it keeps it until the executor is idle for 1 minute (configurable, of course). In batch jobs, this is okay because there's generally a large, continuous backlog of tasks.

However, in streaming jobs, there's a spike of tasks at the start of every micro-batch, but executors are actually idle most of the time. So a streaming job will grab a lot of executors that it doesn't need.

To fix this, the old streaming API (DStreams) has its own version of dynamic allocation: https://issues.apache.org/jira/browse/SPARK-12133. This JIRA has more background on why Spark's batch dynamic allocation algorithm isn't a good fit for streaming.

However, Spark Structured Streaming (likely what you're using) does not support dynamic allocation: https://issues.apache.org/jira/browse/SPARK-24815.

tl;dr Spark requests executors based on its task backlog, not based on memory used.

1 & 2) @Vamshi T is right. Every YARN application has an "Application Master", which is responsible for requesting containers for the application. Each of your Spark jobs has an app master that proxies requests for containers from the driver.

Your configuration doesn't seem to match what you're seeing in YARN, so not sure what's going on there. You have 8 workers with 24g given to YARN. With 12g executors, you should have 2 executors per node, for a total of 16 "slots". An app master + 6 executors should be 7 containers per application, so both applications should fit within the 16 slots.

We configure the app master to have less memory, that's why total memory for an application isn't a clean multiple of 12g.

If you want both applications to schedule all their executors concurrently, you should set spark.executor.instances=5.

Assuming you're using structured streaming, you could also just run both streaming jobs in the same Spark application (submitting them from different threads on the driver).

Useful references:

1
votes

I have noticed similar behavior in my experience as well and here is what I observed. Firstly the resource allocation by yarn depends on available resources on cluster when the job is submitted. When both jobs are submitted at almost the same time with same config, yarn distributes the available resources equally between the jobs. Now when you throw dynamic allocation in to the mix, things get a little confusing/complex. Now in your case below:

7 Running Containers with 84g Memory allocation for the first job. --You got 7 containers because you requested 6 executors, one container for each executor and the extra one container is for the application Master

3 Running Containers with 36g Memory allocation and 72g Reserved Memory for the second job --Since the second job was submitted after some time, Yarn allocated the remaining resources...2 containers, one for each executor and the extra one for your application master.

Your containers will never match the executors you requested and will always be one more than the number of executors you requested because you need one container to run your application master.

Hope that answers part of your question.