I have a spark python application that is being killed by yarn for exceeding memory limits. I have a step that involves loading some resources that are a bit heavy (500+ MB), so I'm using mapPartitions. Basically:
def process_and_output(partition):
resources = load_resources()
for record in partition:
yield transform_record(resources, record)
input = sc.textFile(input_location)
processed = input.mapPartitions(process_and_output)
processed.saveAsTextFile(output_location)
When running, I consistently get this error:
ERROR YarnScheduler: Lost executor 1 on (address removed): Container killed by YARN for exceeding memory limits. 11.4 GB of 11.2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
I tried boosting memoryOverhead up extremely high, but still the same issue. I ran with:
--conf "spark.python.worker.memory=1200m" \
--conf "spark.yarn.executor.memoryOverhead=5300" \
--conf "spark.executor.memory=6g" \
Surely, that's enough memoryOverhead?
I guess more generally, I'm struggling to understand how the python worker's memory is controlled/counted in the overall total. Is there any documentation of this?
I'd also like to understand whether using a generator function will actually cut down on memory usage. Will it stream data through the python process (like I am hoping) or will it buffer it all before sending back to the JVM/spark infrastructure?