We are using CDH 5.13 with Spark 2.3.0 and S3Guard. After running the same job on EMR 5.x / 6.x with the same resources we got 5-20x performance degradation. According to the https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-committer-reqs.html default committer(since 5.20) is not good for S3A. We tested EMR-5.15.1 and got the same results as on Hadoop.
If I am trying to use Magic Commiter I am getting
py4j.protocol.Py4JJavaError: An error occurred while calling o72.save.
: java.lang.ClassNotFoundException: org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
My code is (+I am configured S3Guard via EMR config):
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext, SparkSession
from pyspark.sql.functions import *
sconf = SparkConf()
sconf.set("spark.hadoop.fs.s3a.committer.name", "magic")
sconf.set("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
sconf.set("spark.sql.sources.commitProtocolClass", "com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol")
sconf.set("spark.sql.parquet.output.committer.class", "org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter")
sconf.set("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
sconf.set("spark.hadoop.fs.s3a.commiter.staging.conflict-mode", "replace")
sc = SparkContext(appName="s3acommitter", conf = sconf)
spark = SparkSession(sc)
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
sourceDF = spark.range(0, 10000)
datasets = "s3a://parquet/commiter-test"
sourceDF.write.format("parquet").save(datasets + "parquet")
sc.stop()
At https://repo.hortonworks.com/content/repositories/releases/org/apache/spark/spark-hadoop-cloud_2.11/ I can't find jar for Spark 2.4.4 & Hadoop 3.2.1
How to enable Magic Commiter on EMR?
The Spark Log:
20/11/25 21:49:38 INFO ParquetFileFormat: Using user defined output committer for Parquet: com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
20/11/25 21:49:38 WARN ParquetOutputFormat: Setting parquet.enable.summary-metadata is deprecated, please use parquet.summary.metadata.level
20/11/25 21:49:38 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
20/11/25 21:49:38 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
20/11/25 21:49:38 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
20/11/25 21:49:38 INFO EmrOptimizedParquetOutputCommitter: EMR Optimized committer is not supported by this filesystem (org.apache.hadoop.fs.s3a.S3AFileSystem)
20/11/25 21:49:38 INFO EmrOptimizedParquetOutputCommitter: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
20/11/25 21:49:38 INFO FileOutputCommitter: File Output Committer Algorithm version is 1