1
votes

I've checked the other posts related to this error and I do not found anything working at all.

What I'm trying to do:

df = spark.sql("""
SELECT DISTINCT
  action.AccountId
  ...
  ,to_date(date) as Date
FROM sc_raw_report LEFT JOIN adwords_accounts ON action.AccountId=sc_raw_report.customer_id
WHERE date >= to_date(concat_ws('-',2018,1,1))
GROUP BY action.AccountId
  ,Account_Name
  ...
  ,to_date(date)

  ,substring(timestamp,12,2)
""")

df.show(5, False)

and then a saveAsTable.. Nonetheless it returns an error:

py4j.protocol.Py4JJavaError: An error occurred while calling o119.showString. : org.apache.spark.SparkException: Exception thrown in awaitResult: [...] Caused by: org.apache.spark.SparkException: Cannot broadcast the table that is larger than 8GB: 13 GB

I've tried the: 'spark.sql.autoBroadcastJoinThreshold': '-1'

But it did nothing.

The table adwords_account is very small and doing a print of df.count() on sc_raw_report returns: 2022197

emr-5.28.0 spark 2.4.4 My cluster core: 15 r4.4xlarge (16 vCore, 122 GiB memory, EBS only storage) main: r5a.4xlarge (16 vCore, 128 GiB memory, EBS only storage)

with config for spark-submit --deploy-mode cluster:

--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf fs.s3a.attempts.maximum=30 --conf spark.sql.crossJoin.enabled=true --executor-cores 5 --num-executors 5 --conf spark.dynamicAllocation.enabled=false --conf spark.executor.memoryOverhead=3g --driver-memory 22g --executor-memory 22g --conf spark.executor.instances=49 --conf spark.default.parallelism=490 --conf spark.driver.maxResultSize=0 --conf spark.sql.broadcastTimeout=3600

Anyone know what I can do here?

EDIT: additional info: upgrading to 16 instances or r4.8xlarge (32CPU, 244RAM) did nothing either.

graph with step, then it goes idle before throwing the broadcast error enter image description here

Executors report few moment before the crash: enter image description here

the config:

spark.serializer.objectStreamReset  100
spark.sql.autoBroadcastJoinThreshold    -1
spark.executor.memoryOverhead   3g
spark.driver.maxResultSize  0
spark.shuffle.service.enabled   true
spark.rdd.compress  True
spark.stage.attempt.ignoreOnDecommissionFetchFailure    true
spark.sql.crossJoin.enabled true
hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
spark.scheduler.mode    FIFO
spark.driver.memory 22g
spark.executor.instances    5
spark.default.parallelism   490
spark.resourceManager.cleanupExpiredHost    true
spark.executor.id   driver
spark.driver.extraJavaOptions   -Dcom.amazonaws.services.s3.enableV4=true
spark.hadoop.fs.s3.getObject.initialSocketTimeoutMilliseconds   2000
spark.submit.deployMode cluster
spark.sql.broadcastTimeout  3600
spark.master    yarn
spark.sql.parquet.output.committer.class    com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
spark.ui.filters    org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.blacklist.decommissioning.timeout 1h
spark.sql.hive.metastore.sharedPrefixes com.amazonaws.services.dynamodbv2
spark.executor.memory   22g
spark.dynamicAllocation.enabled false
spark.sql.catalogImplementation hive
spark.executor.cores    5
spark.decommissioning.timeout.threshold 20
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem true
spark.hadoop.yarn.timeline-service.enabled  false
spark.yarn.executor.memoryOverheadFactor    0.1875
1
Currently, it is a hard limit in the spark that the broadcast variable size should be less than 8GB. Please specify the size of both the dataset. Can you repartition both the dataset before join with the join column and try?dassum
the data is partitioned by customer_id / date thus it's quite difficult to know exactly how large the dataset is. But for adwords_accounts it should be ~ 5MB and a print of df.count() on sc_raw_report returns 2022197Jay Cee

1 Answers

0
votes

After ShuffleMapStage, part of shuffle block needs to be broadcasted at driver. Please make sure Driver (in your case an AM in YARN ) has enough memory/overhead.

Could you post sc run time config ?