15
votes

I have a spark streaming application that writes parquet data from stream.

sqlContext.sql(
      """
        |select
        |to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
        |hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
        |*
        |from events
        | where at >= 1473667200
      """.stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)

this piece of code runs every hour but over time the writing to parquet has slowed down. When we started it took 15 mins to write data, now it takes 40 mins. It is taking time propotional to data existing in that path. I tried running the same application to a new location and that runs fast.

I have disabled schemaMerge and summary metadata:

sparkConf.set("spark.sql.hive.convertMetastoreParquet.mergeSchema","false")
sparkConf.set("parquet.enable.summary-metadata","false")

using spark 2.0

batch execution: empty directory enter image description here enter image description here enter image description here directory with 350 folders enter image description here enter image description here enter image description here

3
the problem is because spark tries to list all leaf nodes and that part is very slow, it tries to do that twice and that adds up 13 * 2 mins extra. Although I am unable to figure out why the list file is slow - Gaurav Shah
did you manage to find any solution? I encountered thes same issue. In spark-submit stderr I can see that Spark opens for reading and also seeking the old parquet files in S3. I just don't understand how to avoid this. - Niros
I could not manage to avoid reading the older partitions, but I did improve the partition read speed 10 fold. there by reducing over all time from 10 min to 1 min. stackoverflow.com/questions/39513505/… - Gaurav Shah
What do you think about writing the new data into local hdfs and then copy it to s3? That way, the listing will be fast - Niros
might as well copy to a new location on s3 each time then invoke mv - Gaurav Shah

3 Answers

1
votes

I've encountered this issue. The append mode is probably the culprit, in that finding the append location takes more and more time as the size of your parquet file grows.

One workaround I've found that solves this is to change the output path regularly. Merging and reordering the data from all the output dataframes is then usually not an issue.

def appendix: String = ((time.milliseconds - timeOrigin) / (3600 * 1000)).toString

df.write.mode(SaveMode.Append).format("parquet").save(s"${outputPath}-H$appendix")
1
votes

Try to write the dataframe to EMR HDFS (hdfs://...) and then use s3-dist-cp to upload the data from HDFS to S3. Worked for me.

0
votes

It might be due to append mode. In this mode new files should be generated with different names from already existing files, so spark lists files in s3(which is slow) every time.

We also set parquet.enable.summary-metadata a bit differently:

javaSparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");