32
votes

I'm writing to see if anyone knows how to speed up S3 write times from Spark running in EMR?

My Spark Job takes over 4 hours to complete, however the cluster is only under load during the first 1.5 hours.

enter image description here

I was curious into what Spark was doing all this time. I looked at the logs and I found many s3 mv commands, one for each file. Then taking a look directly at S3 I see all my files are in a _temporary directory.

Secondary, I'm concerned with my cluster cost, it appears I need to buy 2 hours of compute for this specific task. However, I end up buying unto 5 hours. I'm curious if EMR AutoScaling can help with cost in this situation.

Some articles discuss changing the file output committer algorithm but I've had little success with that.

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

Writing to the local HDFS is quick. I'm curious if issuing a hadoop command to copy the data to S3 would be faster?

enter image description here

7
finally, which implementation did you use? I am stuck with the same problem.Programmer
@JaspinderVirdee write your data to the local HDFS directory then use s3-dist-cp to copy your data back to S3. Also if your EMR cluster is missing the s3-dist-cp command you have to Hadoop listed in your create-cluster command. example: --applications Name=Hadoop Name=Spark Name=Ganglia Name=zeppelinjspooner
Do you mean that I use s3 just for backup? For example - Currently, I am using Spark Streaming and my data is partitioned on key - "city" saved on s3 directly. Every stream batch of 1 minute more data comes and added under in each city. How would I append this data on s3 using your strategy -> save on local then copy to s3.Programmer
The last step of my stream is save to fs(hdfs or s3). How can Push the changes to s3 after it is successfully written on hdfs? and is it even possible to do with scala/python code? or is s3-dist-cp used from shell only?Programmer

7 Answers

22
votes

What you are seeing is a problem with outputcommitter and s3. the commit job applies fs.rename on the _temporary folder and since S3 does not support rename it means that a single request is now copying and deleting all the files from _temporary to its final destination..

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") only works with hadoop version > 2.7. what it does is to copy each file from _temporary on commit task and not commit job so it is distributed and works pretty fast.

If you use older version of hadoop I would use Spark 1.6 and use:

sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

*note that it does not work with specualtion turned on or writing in append mode

**also note that it is deprecated in Spark 2.0 (replaced by algorithm.version=2)

BTW in my team we actually write with Spark to HDFS and use DISTCP jobs (specifically s3-dist-cp) in production to copy the files to S3 but this is done for several other reasons (consistency, fault tolerance) so it is not necessary.. you can write to S3 pretty fast using what I suggested.

7
votes

I had similar use case where I used spark to write to s3 and had performance issue. Primary reason was spark was creating lot of zero byte part files and replacing temp files to actual file name was slowing down the write process. Tried below approach as work around

  1. Write output of spark to HDFS and used Hive to write to s3. Performance was much better as hive was creating less number of part files. Problem I had is(also had same issue when using spark), delete action on Policy was not provided in prod env because of security reasons. S3 bucket was kms encrypted in my case.

  2. Write spark output to HDFS and Copied hdfs files to local and used aws s3 copy to push data to s3. Had second best results with this approach. Created ticket with Amazon and they suggested to go with this one.

  3. Use s3 dist cp to copy files from HDFS to S3. This was working with no issues, but not performant

7
votes

The direct committer was pulled from spark as it wasn't resilient to failures. I would strongly advice against using it.

There is work ongoing in Hadoop, s3guard, to add 0-rename committers, which will be O(1) and fault tolerant; keep an eye on HADOOP-13786.

Ignoring "the Magic committer" for now, the Netflix-based staging committer will ship first (hadoop 2.9? 3.0?)

  1. This writes the work to the local FS, in task commit
  2. issues uncommitted multipart put operations to write the data, but not materialize it.
  3. saves the information needed to commit the PUT to HDFS, using the original "algorithm 1" file output committer
  4. Implements a job commit which uses the file output commit of HDFS to decide which PUTs to complete, and which to cancel.

Result: task commit takes data/bandwith seconds, but job commit takes no longer than the time to do 1-4 GETs on the destination folder and a POST for every pending file, the latter being parallelized.

You can pick up the committer which this work is based on, from netflix, and probably use it in spark today. Do set the file commit algorithm = 1 (should be the default) or it wont actually write the data.

2
votes

I was the same issue, I found a solution to change the s3 protocol, originally i was using s3a:// for read and write the data, then I changed to only s3:// and it works perfect, actually my process appends data.

1
votes

What do you see in spark output? If you see lots of rename operations, read this

1
votes

We experienced the same on Azure using Spark on WASB. We finally decided to not use the distrbitued storage directly with spark. We did spark.write to a real hdfs:// destination and develop a specific tool that do : hadoop copyFromLocal hdfs:// wasb:// The HDFS is then our temporary buffer before archiving to WASB (or S3).

0
votes

How large is the file(s) you are writing too? Having one core writing to a very large file is going to be much slower than splitting the file up and have multiple workers write out smaller files.