0
votes

We are using CDH 5.13 with Spark 2.3.0 and S3Guard. After running the same job on EMR 5.x / 6.x with the same resources we got 5-20x performance degradation. According to the https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-committer-reqs.html default committer(since 5.20) is not good for S3A. We tested EMR-5.15.1 and got the same results as on Hadoop.

If I am trying to use Magic Commiter I am getting

py4j.protocol.Py4JJavaError: An error occurred while calling o72.save.
: java.lang.ClassNotFoundException: org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

My code is (+I am configured S3Guard via EMR config):

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext, SparkSession
from pyspark.sql.functions import *

sconf = SparkConf()
sconf.set("spark.hadoop.fs.s3a.committer.name", "magic")
sconf.set("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
sconf.set("spark.sql.sources.commitProtocolClass", "com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol")
sconf.set("spark.sql.parquet.output.committer.class", "org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter")
sconf.set("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
sconf.set("spark.hadoop.fs.s3a.commiter.staging.conflict-mode", "replace")

sc = SparkContext(appName="s3acommitter", conf = sconf)

spark = SparkSession(sc)
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()

sourceDF = spark.range(0, 10000)
datasets = "s3a://parquet/commiter-test"
sourceDF.write.format("parquet").save(datasets + "parquet")
sc.stop()

At https://repo.hortonworks.com/content/repositories/releases/org/apache/spark/spark-hadoop-cloud_2.11/ I can't find jar for Spark 2.4.4 & Hadoop 3.2.1

How to enable Magic Commiter on EMR?

The Spark Log:

20/11/25 21:49:38 INFO ParquetFileFormat: Using user defined output committer for Parquet: com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
20/11/25 21:49:38 WARN ParquetOutputFormat: Setting parquet.enable.summary-metadata is deprecated, please use parquet.summary.metadata.level
20/11/25 21:49:38 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
20/11/25 21:49:38 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
20/11/25 21:49:38 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
20/11/25 21:49:38 INFO EmrOptimizedParquetOutputCommitter: EMR Optimized committer is not supported by this filesystem (org.apache.hadoop.fs.s3a.S3AFileSystem)
20/11/25 21:49:38 INFO EmrOptimizedParquetOutputCommitter: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
20/11/25 21:49:38 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
2
Is there a reason you are using S3A rather than using EMRFS (s3:// paths) on EMR?Jonathan Kelly
@JonathanKelly Yes - we are using S3Guard for Hadoop & PrestoValeriy Solovyov
@JonathanKelly also, I like to have the source code in case of debuggingBlueSheepToken

2 Answers

1
votes

I believe that after this announcement Amazon S3 Update – Strong Read-After-Write Consistency most people will drop S3Guard in their environments.

But, we improved the overall performance for our jobs (that use S3A on EMR) 45 minutes-2 h => 2 minutes:

We added to the EMR configuration:

  • fs.s3a.impl org.apache.hadoop.fs.s3a.S3A => com.amazon.ws.emr.hadoop.fs.EmrFileSystem

    { "classification": "core-site", "properties": { “fs.s3a.impl”: “com.amazon.ws.emr.hadoop.fs.EmrFileSystem”}}, "configurations": [] }

  • Disable parquet.enable.summary-metadata

    { "parquet.enable.summary-metadata", "false" }

Improving Spark Performance with Cloud Storage by Cloudera:

When setting up parquet.enable.summary-metadata to true as we could see as soon as the write operation terminates the driver will perform a sequential scan of all the written parquet files footer in order to retrieve the related schema and create the summary file _common_metadata on the directory specified by the write operation. This is a single thread operation and takes a lot of time. In particular the more parquet files are generated, the more files has to be inspected and so the more time it will take.

The main point is that Parquet summary files are not particular useful nowadays since:

  • when schema merging is disabled, we assume schema of all Parquet part-files are identical, thus we can read the footer from any part-files.

  • when schema merging is enabled, we need to read footers of all files anyway to do the merge.

These are some reasons behind why writing Parquet summary files by default was disabled on recent Spark versions:

https://issues.apache.org/jira/browse/SPARK-15719

1
votes

You are getting the class not found errors as some of the binding classes in spark (in spark-hadoop-cloud) aren't on the classpath. Even if they were, you'd be blocked at the next hurdle: those committers aren't in EMR.

Amazon EMR has its own version of the S3A committers: Using the EMRFS S3-optimized Committer