1
votes
ErrorMessage': 'An error occurred while calling o103.pyWriteDynamicFrame. 
Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 5.0 
(TID 131, ip-1-2-3-4.eu-central-1.compute.internal, executor 20): 
ExecutorLostFailure (executor 20 exited caused by one of the running tasks) 
Reason: Container killed by YARN for exceeding memory limits.  5.5 GB of 
5.5 GB physical memory used. Consider boosting 
spark.yarn.executor.memoryOverhead or disabling 
yarn.nodemanager.vmem-check-enabled because of YARN-4714.

The job is doing this (pseudo code):

  1. Reads CSV into DyanamicFrame dynf
  2. `dynf.toDF().repartition(100)
  3. Map.apply(dyndf, tf) # tf being function applied on every row
  4. `dynf.toDF().coalesce(10)
  5. Writes dyndf as parquet to S3

This job has been executed dozens of times with identical Glue setup (Standard worker with MaxCapacity of 10.0) successfully and reexecution on CSV that it failed on is usually successful without any adjustments. Meaning: it works. Not just that. The job ran even successfully with much larger CSVs than those it failed on.

That's what I mean with erratic. I don't see a pattern like if CSV is larger than X then I need more workers or something like that.

Has somebody an idea what might be a cause for this error which occurs somewhat randomly?


The relevant part of the code:

import sys

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

# s3://bucket/path/object
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'SOURCE_BUCKET', # "bucket"
    'SOURCE_PATH',   # "path/"
    'OBJECT_NAME',   # "object"
    'TARGET_BUCKET', # "bucket"
    'TARGET_PATH',   # "path/"
    'PARTS_LOAD',
    'PARTS_SAVE'
])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

data_DYN = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    format="csv",
    connection_options={
        "paths":[
            "s3://{sb}/{sp}{on}".format(
                sb=args['SOURCE_BUCKET'],
                sp=args['SOURCE_PATH'],
                on=args['OBJECT_NAME']
            )
        ]
    },
    format_options={
        "withHeader": True,
        "separator": ","
    }
)

data_DF = data_DYN.toDF().repartition(int(args["PARTS_LOAD"]))
data_DYN = DynamicFrame.fromDF(data_DF, glueContext, "data_DYN")

def tf(rec):
    # functions applied to elements of rec
    return rec

data_DYN_2 = Map.apply(data_DYN, tf)

cols = [
    'col1', 'col2', ...
]

data_DYN_3 = SelectFields.apply(data_DYN_2, cols)

data_DF_3 = data_DYN_3.toDF().cache()

data_DF_4 = data_DF_3.coalesce(int(args["PARTS_SAVE"]))
data_DYN_4 = DynamicFrame.fromDF(data_DF_4, glueContext, "data_DYN_4")

datasink = glueContext.write_dynamic_frame.from_options(
    frame = data_DYN_4, 
    connection_type = "s3", 
    connection_options = {
        "path": "s3://{tb}/{tp}".format(tb=args['TARGET_BUCKET'],tp=args['TARGET_PATH']),
        "partitionKeys": ["col_x","col_y"]
    }, 
    format = "parquet",
    transformation_ctx = "datasink"
)

job.commit()
1
Could you point where stage 5.0 is in your pseudo-code?mazaneicha
What about executor and driver memory? (Map should be run by executors). Also - caching might be an option for you. Like in step 2 dynf=dynf.toDF().repartition(100).cache() (I would also cache after step 4)Grzegorz Skibinski
@mazaneicha: the error message refers to pyWriteDynamicFrame which corresponds to step 5.Raffael
@GrzegorzSkibinski: evey node features 16GB RAM while the entire CSV is about 3 to 4GB. Why do you think I should cache after the repartitioning? The funny thing is that same code worked better with even larger CSVs (up to 8GB) and same cluster configuration. I just had several independent clusters fail in a row with those smaller CSVs. That problem seems so inconsistent.Raffael

1 Answers

2
votes

I would suspect .coalesce(10) to be the culprit, due to 100 -> 10 reduction in number of partitions without rebalancing the data across them. Doing .repartition(10) instead might fix it, at the expense of an extra shuffle.