ErrorMessage': 'An error occurred while calling o103.pyWriteDynamicFrame.
Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 5.0
(TID 131, ip-1-2-3-4.eu-central-1.compute.internal, executor 20):
ExecutorLostFailure (executor 20 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 5.5 GB of
5.5 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead or disabling
yarn.nodemanager.vmem-check-enabled because of YARN-4714.
The job is doing this (pseudo code):
- Reads CSV into DyanamicFrame
dynf
- `dynf.toDF().repartition(100)
Map.apply(dyndf, tf) # tf being function applied on every row
- `dynf.toDF().coalesce(10)
- Writes
dyndf
as parquet to S3
This job has been executed dozens of times with identical Glue setup (Standard worker with MaxCapacity of 10.0) successfully and reexecution on CSV that it failed on is usually successful without any adjustments. Meaning: it works. Not just that. The job ran even successfully with much larger CSVs than those it failed on.
That's what I mean with erratic. I don't see a pattern like if CSV is larger than X then I need more workers or something like that.
Has somebody an idea what might be a cause for this error which occurs somewhat randomly?
The relevant part of the code:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
# s3://bucket/path/object
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'SOURCE_BUCKET', # "bucket"
'SOURCE_PATH', # "path/"
'OBJECT_NAME', # "object"
'TARGET_BUCKET', # "bucket"
'TARGET_PATH', # "path/"
'PARTS_LOAD',
'PARTS_SAVE'
])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
data_DYN = glueContext.create_dynamic_frame_from_options(
connection_type="s3",
format="csv",
connection_options={
"paths":[
"s3://{sb}/{sp}{on}".format(
sb=args['SOURCE_BUCKET'],
sp=args['SOURCE_PATH'],
on=args['OBJECT_NAME']
)
]
},
format_options={
"withHeader": True,
"separator": ","
}
)
data_DF = data_DYN.toDF().repartition(int(args["PARTS_LOAD"]))
data_DYN = DynamicFrame.fromDF(data_DF, glueContext, "data_DYN")
def tf(rec):
# functions applied to elements of rec
return rec
data_DYN_2 = Map.apply(data_DYN, tf)
cols = [
'col1', 'col2', ...
]
data_DYN_3 = SelectFields.apply(data_DYN_2, cols)
data_DF_3 = data_DYN_3.toDF().cache()
data_DF_4 = data_DF_3.coalesce(int(args["PARTS_SAVE"]))
data_DYN_4 = DynamicFrame.fromDF(data_DF_4, glueContext, "data_DYN_4")
datasink = glueContext.write_dynamic_frame.from_options(
frame = data_DYN_4,
connection_type = "s3",
connection_options = {
"path": "s3://{tb}/{tp}".format(tb=args['TARGET_BUCKET'],tp=args['TARGET_PATH']),
"partitionKeys": ["col_x","col_y"]
},
format = "parquet",
transformation_ctx = "datasink"
)
job.commit()
dynf=dynf.toDF().repartition(100).cache()
(I would also cache after step 4) – Grzegorz SkibinskipyWriteDynamicFrame
which corresponds to step 5. – Raffael