1
votes

I think AWS Glue is running out of memory after failing to write parquet output ...

An error occurred while calling o126.parquet. Job aborted due to stage failure: Task 82 in stage 9.0 failed 4 times, most recent failure: Lost task 82.3 in stage 9.0 (TID 17400, ip-172-31-8-70.ap-southeast-1.compute.internal, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

More complete log below

Traceback (most recent call last): File "script_2019-01-29-06-53-53.py", line 71, in .parquet("s3://.../flights2") File "/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 691, in parquet File "/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o126.parquet. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:508) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 82 in stage 9.0 failed 4 times, most recent failure: Lost task 82.3 in stage 9.0 (TID 17400, ip-172-31-8-70.ap-southeast-1.compute.internal, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)

It appears the failing line is:

.parquet("s3://pinfare-glue/flights2")

My Glue job looks like below: any way I can resolve this? I am considering removing some folders from S3 so that Glue processes the data in batches ... but this is not scalable ...

Another thing is maybe I create a dataframe for each date and write these smaller partitions in a loop ... but will this be very slow?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import regexp_replace, to_timestamp

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print(">>> READING ...")
inputGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "flights", transformation_ctx="inputGDF")
# inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://pinfare-actuary-storage-csv"], "recurse": True}, format = "csv", format_options = {"withHeader": True}, transformation_ctx="inputGDF")
print(">>> DONE READ ...")

flightsDf = inputGDF.toDF()
if bool(flightsDf.head(1)):
    df = flightsDf \
        .drop("createdat") \
        .drop("updatedat") \
        .withColumn("agent", flightsDf["agent"].cast("int")) \
        .withColumn("querydestinationplace", flightsDf["querydestinationplace"].cast("int")) \
        .withColumn("querydatetime", regexp_replace(flightsDf["querydatetime"], "-", "").cast("int")) \
        .withColumn("queryoutbounddate", regexp_replace(flightsDf["queryoutbounddate"], "-", "").cast("int")) \
        .withColumn("queryinbounddate", regexp_replace(flightsDf["queryinbounddate"], "-", "").cast("int")) \
        .withColumn("outdeparture", to_timestamp(flightsDf["outdeparture"], "yyyy-MM-ddTHH:mm:ss")) \
        .withColumn("outarrival", to_timestamp(flightsDf["outarrival"], "yyyy-MM-ddTHH:mm:ss")) \
        .withColumn("indeparture", to_timestamp(flightsDf["indeparture"], "yyyy-MM-ddTHH:mm:ss")) \
        .withColumn("inarrival", to_timestamp(flightsDf["inarrival"], "yyyy-MM-ddTHH:mm:ss")) \

    df.createOrReplaceTempView("flights")

    airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "airports")
    airportsDF = airportsGDF.toDF()
    airportsDF.createOrReplaceTempView("airports")

    agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "agents")
    agentsRawDF = agentsGDF.toDF()
    agentsRawDF.createOrReplaceTempView("agents_raw")

    agentsDF = spark.sql("""
        SELECT id, name, type FROM agents_raw
        WHERE type IN ('Airline', 'TravelAgent')
    """) 
    agentsDF.createOrReplaceTempView("agents")

    finalDf = spark.sql("""
            SELECT /*+ BROADCAST(agents) */ /*+ BROADCAST(airports) */
                f.*, countryName, cityName, airportName, a.name AS agentName,
                CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key
            FROM flights f
            LEFT JOIN agents a
            ON f.agent = a.id
            LEFT JOIN airports p
            ON f.querydestinationplace = p.airportId
        """)
    print(">>> DONE PROCESS FLIGHTS")

    print("Writing ...")
    finalDf \
      .write \
      .mode("append") \
      .partitionBy(["countryName", "querydatetime"]) \
      .parquet("s3://.../flights2")
else:
    print("Nothing to write ...")

job.commit()

import boto3
glue_client = boto3.client('glue', region_name='ap-southeast-1')
glue_client.start_crawler(Name='...')
1
What's your total size of source data and number of DPU's used for your Glue job ? Have you tried increasing spark.yarn.executor.memoryOverhead=7g as explained in stackoverflow.com/questions/49034126/… . ?Prabhakar Reddy
@bdcloud let me try this, but others there seem to suggest it either does not work or cannot be used? In any case, it does not sounds like a very scalable solution? Does pyspark not stream the writes or something? So memory can be reused?Jiew Meng
It really depends on the data size unless you want to process hundreds of TBs you are good to go.Prabhakar Reddy
If ur LEFT JOIN has 1:N mapping it will result into exponentially large rows in DF which may cause OOM. In glue, there is no provision to setup your own infra configuration e.g. 64GB memory per vCPU. If that is case, first try using spark.yarn.executor.memoryOverhead option or/and increasing DPUs. Otherwise, you have to bucket data using pushdown predicate and then run in for loop over all dataSandeep Fatangare

1 Answers

2
votes

If ur LEFT JOIN has 1:N mapping it will result into exponentially large rows in DF which may cause OOM. In glue, there is no provision to setup your own infra configuration e.g. 64GB memory per vCPU. If that is case, first try using spark.yarn.executor.memoryOverhead option or/and increasing DPUs. Otherwise, you have to bucket data using pushdown predicate and then run in for loop over all data