0
votes

I started seeing the following error after deploying some changes to a Spark SQL query in AWS Glue Spark 2.2.1 environment:

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 164 tasks (1031.4 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

I tried disabling broadcast joins with set("spark.sql.autoBroadcastJoinThreshold", "-1") and increasing maxResultSize which caused other errors but the problem persisted until I replaced the following join

X left outer join Y on array_contains(X.ids, Y.id)

with

val sqlDF = spark.sql("select * from X lateral view explode(ids) t as id")
sqlDF.createOrReplaceTempView("X_exploded")
...
X_exploded left outer join Y on X_exploded.id = Y.id

I am using AWS Glue manage environment and don't have access to the query plan. However, I am curious why joining on array_contains would cause more data to be brought to the driver than exploding and using an exact match?

Note that table X contains 350KB of data in json/gzip format and table Y contains about 50GB json/zip.

Thanks!

2
Can you share the spark plan of your query?Constantine
Quoting from my OP: "I am using AWS Glue manage environment and don't have access to the query plan."alecswan

2 Answers

0
votes

It appears that your earlier approach is bringing all the values from Y if array_contains function returns true .

While in your later approach , explode creates new row for each element and thus eliminating any duplicates and ultimately reducing the number of rows returned.

0
votes

You can use command line, --conf spark.driver.maxResultSize=4g to increase the max result size.