1
votes

I have the following job script that is throwing errors when there are lots of files to process.

import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'ENVIRONMENT', 'WORK_BUCKET_NAME', 'OUTPUT_BUCKET_NAME'])

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
env = args['ENVIRONMENT']
work_bucket_name = args['WORK_BUCKET_NAME']
output_bucket_name = args['OUTPUT_BUCKET_NAME']

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = f"{env}_raw_edocs", table_name = "esocial_s_2200", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("esocial", "string", "esocial", "string"), ("tenant", "string", "tenant", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

unbox4 = Unbox.apply(frame = dropnullfields3, path = "esocial", format = "json")

relationalize5 = Relationalize.apply(frame = unbox4, staging_path = f"s3://{work_bucket_name}/{env}/edocs/relationalize-temp/esocial_s_2200", name = "root", transformation_ctx = "relationalize5")

if len(relationalize5.select('root').toDF().schema) > 0:
    datasink8 = glueContext.write_dynamic_frame.from_options(frame = relationalize5.select('root'), connection_type = "s3", connection_options = {"path": f"s3://{output_bucket_name}/{env}/anonymous/edocs/esocial_s-2200", "partitionKeys": ["tenant", "year", "month", "day"]}, format = "parquet", transformation_ctx = "datasink8")
    job.commit()

The stack error is:

File "/tmp/raw_edocs_s_2200.py", line 55, in relationalize5 = Relationalize.apply(frame = unbox4, staging_path = f"s3://{work_bucket_name}/{env}/edocs/relationalize-temp/esocial_s_2200", name = "root", transformation_ctx = "relationalize5") File "/opt/amazon/lib/python3.6/site-packages/awsglue/transforms/transform.py", line 24, in apply return transform(*args, **kwargs) File "/opt/amazon/lib/python3.6/site-packages/awsglue/transforms/relationalize.py", line 47, in call return frame.relationalize(name, staging_path, options, transformation_ctx, info, stageThreshold, totalThreshold) File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 344, in relationalize long(stageThreshold), long(totalThreshold))) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o97.relationalize. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3190 in stage 1.0 failed 4 times, most recent failure: Lost task 3190.3 in stage 1.0 (TID 9056, 172.36.129.80, executor 1): java.io.FileNotFoundException: /tmp/blockmgr-5e470d53-7285-469b-8eb2-5e1c9b43e02c/1e/rdd_1039_3190 (Too many open files)

My job is configured like:

  GlueS2200RawJob:
    Type: AWS::Glue::Job
    Properties:
      Command:
        Name: "glueetl"
        PythonVersion: 3
        ScriptLocation: !Sub s3://${WorkBucketName}/${Environment}/anonymous/glue_jobs/raw_edocs_s_2200.py
      DefaultArguments:
        "--job-bookmark-option": "job-bookmark-enable"
        "--ENVIRONMENT": !Ref Environment
        "--WORK_BUCKET_NAME": !Ref WorkBucketName
        "--OUTPUT_BUCKET_NAME": !Ref OutputBucketName
      GlueVersion: "2.0"
      Name: !Sub ${Environment}_raw_edocs_s_2200
      NumberOfWorkers: 2
      Role: !Ref GlueS22
      Tags:
        env: !Ref Environment

      Tags:
        env: !Ref Environment

Does anyone know anything that could help to solve this problem?

1
Can you try coalesce on your dataframe just before write and reduce the number of files ?Prabhakar Reddy

1 Answers

0
votes

To solve the problem I needed to coalesce the dataframe and reduce the number of partitions.

cores = int(sc.getConf().get('spark.executor.cores'))
instances = int(sc.getConf().get('spark.executor.instances'))
max_partitions = 200
coalesced_df = unbox4.toDF().coalesce(max(cores * instances, max_partitions))
coalesced5 = DynamicFrame.fromDF(coalesced_df, glue_context, 'coalesced5')