4
votes

I have a Google Dataproc Cluster running and am submitting a PySpark job to it that reads in a file from Google Cloud Storage (945MB CSV file with 4 million rows --> takes 48 seconds in total to read in) to a PySpark Dataframe and applies a function to that dataframe (parsed_dataframe = raw_dataframe.rdd.map(parse_user_agents).toDF() --> takes about 4 or 5 seconds).

I then have to save those modified results back to Google Cloud Storage as a GZIP'd CSV or Parquet file. I can also save those modified results locally, and then copy them to a GCS bucket.

I repartition the dataframe via parsed_dataframe = parsed_dataframe.repartition(15) and then try saving that new dataframe via

parsed_dataframe.write.parquet("gs://somefolder/proto.parquet")

parsed_dataframe.write.format("com.databricks.spark.csv").save("gs://somefolder/", header="true")

parsed_dataframe.write.format("com.databricks.spark.csv").options(codec="org.apache.hadoop.io.compress.GzipCodec").save("gs://nyt_regi_usage/2017/max_0722/regi_usage/", header="true")

Each of those methods (and their different variants with lower/higher partitions and saving locally vs. on GCS) takes over 60 minutes for the 4 million rows (945 MB) which is a pretty long time.

How can I optimize this/make saving the data faster?

It should be worth noting that both the Dataproc Cluster and GCS bucket are in the same region/zone, and that the Cluster has a n1-highmem-8 (8CPU, 52GB memory) Master node with 15+ worker nodes (just variables I'm still testing out)

3
That's definitely abnormally high; did you verify that the output really was only creating 15 files after repartition(15)? Did it also take a long time if saving to cluster-local HDFS instead, like write.parquet("hdfs:///tmp/foo")?Dennis Huo
@DennisHuo I did verify that the output created (at least) 15 files after the repartition. Saving to a cluster-local HDFS takes roughly the same amount of time (1 hour)Ryan D'souza

3 Answers

4
votes

Some red flags here.

1) reading as a DF then converting to an RDD to process and back to a DF alone is very inefficient. You lose catalyst and tungsten optimizations by reverting to RDDs. Try changing your function to work within DF.

2) repartitioning forces a shuffle but more importantly means that the computation will now be limited to those executors controlling the 15 partitions. If your executors are large (7 cores, 40ish GB RAM), this likely isn't a problem.

What happens if you write the output before being repartitioned?

Please provide more code and ideally spark UI output to show how long each step in the job takes.

0
votes

Try this, it should take few minutes:

your_dataframe.write.csv("export_location", mode="overwrite", header=True, sep="|")

Make sure you add mode="overwrite" if you want overwrite an old version.

0
votes

Are you calling an action on parsed_dataframe?

As you wrote it above, Spark will not compute your function until you call write. If you're not calling an action, see how long parsed_dataframe.cache().count() takes. I suspect it will take an hour and that then running parsed_dataframe.write(...) will be much faster.