10
votes

I have a spark python application that is being killed by yarn for exceeding memory limits. I have a step that involves loading some resources that are a bit heavy (500+ MB), so I'm using mapPartitions. Basically:

def process_and_output(partition):
    resources = load_resources()
    for record in partition:
        yield transform_record(resources, record)

input = sc.textFile(input_location)
processed = input.mapPartitions(process_and_output)
processed.saveAsTextFile(output_location)

When running, I consistently get this error:

ERROR YarnScheduler: Lost executor 1 on (address removed): Container killed by YARN for exceeding memory limits. 11.4 GB of 11.2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

I tried boosting memoryOverhead up extremely high, but still the same issue. I ran with:

--conf "spark.python.worker.memory=1200m" \
--conf "spark.yarn.executor.memoryOverhead=5300" \
--conf "spark.executor.memory=6g" \

Surely, that's enough memoryOverhead?

I guess more generally, I'm struggling to understand how the python worker's memory is controlled/counted in the overall total. Is there any documentation of this?

I'd also like to understand whether using a generator function will actually cut down on memory usage. Will it stream data through the python process (like I am hoping) or will it buffer it all before sending back to the JVM/spark infrastructure?

3

3 Answers

5
votes

Yarn kills executors when its

memory usage > (executor-memory + executor.memoryOverhead)

From your setting it looks like it is a valid exception.

(memory usage) 11.4GB > 11.18GB (executor-memory=6GB + memoryOverhead=5.18GB)

try with

--conf "spark.yarn.executor.memoryOverhead=6144"`
3
votes

As you see 11.2 GB is your max memory for a container created by yarn. It is equal to executor memory + overhead. So python memory is not counted for that.

Exception wants you to increase overhead but you can just increase executor-memory without increasing overheadmemory. Thats all i can say without knowing why you need that much memory in a single executor, may be a cartesian or something like that can require so much memory.

2
votes

Two and a half years later... I happen to be reading spark release notes and see this:

Add spark.executor.pyspark.memory limit

With this linked bug: https://issues.apache.org/jira/browse/SPARK-25004

I've long since worked around my original issue and then changed jobs so I no longer have the ability to try this out. But I suspect this may have been the exact problem I was having.