2
votes

I am running a Spark program using a hadoop cluster, which uses the yarn scheduler to run the tasks. However, I notice a strange behavior. Yarn sometimes kills a task complaining out of memory error, whereas if I execute the tasks in rounds, that is, execute the same number of tasks as containers/executors, let them finish, and then execute the next group of tasks, it runs fine, meaning that tasks aren't using any more memory than allowed in the containers. So, I'm suspecting that yarn is trying to run more than one tasks in parallel in a container, which is why a container goes out of memory. Is there a way to restrict this behavior and tell yarn to run only one task at a time in a container.

1
Can you add more details about the two approaches you followed like, how many executors and executor memory. How are you running different group of tasks(is it memory based?). Also are any other applications running on cluster which uses YARNRamzy
Basically in the first approach, I just use map. In the second approach, I run the program multiple times, each time, with number of tasks equal to number of executors. When I do that, it works fine, but when I simply use map and run it in one go, it fails.pythonic

1 Answers

3
votes

In general each YARN container that Spark requests corresponds directly to one "executor" and even though YARN may report 1 CPU allocated per container, under the hood Spark uses the spark.executor.cores setting to determine the number of concurrent tasks packed into single executor/container processes.

So simply set spark.executor.cores=1 and each YARN container will only work on one task at a time. This can be done either as a spark-submit configuration like --conf spark.executor.cores=1 or you can put it in conf/spark-defaults.conf (on most standard hadoop installations this would be inside /etc/spark/conf/spark-defaults.conf).

Note that there may still be multiple YARN containers per machine; if you want to further limit 1 task at a time per machine you'd also need to expand spark.executor.memory to be the amount of memory available on each machine (allocated to YARN NodeManagers running on that machine; YARN will refuse to pack any containers larger than what you've told the NodeManager it's allowed to use even if physical memory is larger). Or you may find that you simple need to carve up your machine to slightly larger chunks, so you can just play with that memory setting to find the right memory size without sacrificing too much parallelism.