3
votes

Problem Statement

I've read a partitioned CSV file into a Spark Dataframe.

In order to leverage the improvements of Delta Tables I'm trying to simply export it as Delta in a directory inside an Azure Data Lake Storage Gen2. I'm using the code below in a Databricks notebook:

%scala

df_nyc_taxi.write.partitionBy("year", "month").format("delta").save("/mnt/delta/")

The whole dataframe has around 160 GB.

Hardware Specs

I'm running this code using a cluster with 12 Cores and 42 GB of RAM.

However looks like the whole writing process is being handled by Spark/Databricks sequentially, e.g. non-parallel fashion:

enter image description here

The DAG Visualization looks like the following:

enter image description here

All in all looks like this will take 1-2 hours to execute.

Questions

  • Is there a way to actually make Spark write to different partitions in parallel?
  • Could it be that the problem is that I'm trying to write the delta table directly to the Azure Data Lake Storage?
2
try repartition(your_partition_columns).write.partitionBy("year", "month") - eliasah
Thanks for the input @eliasah. Doesn't repartition expect an integer rather than a list of columns? - born to hula
When I try: df_nyc_taxi.repartition("year", "month").write.partitionBy("year", "month").format("delta").save("/mnt/delta/") I get: error: overloaded method value repartition with alternatives: (partitionExprs: org.apache.spark.sql.Column*)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] <and> (numPartitions: Int,partitionExprs: org.apache.spark.sql.Column*)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] cannot be applied to (String, String) df_nyc_taxi.repartition("year", "month").write.partitionBy("year", "month").format("delta").save("/mnt/delta/") - born to hula
repartition can take a list of columns as repeated arguments. check this : stackoverflow.com/questions/52521067/… - eliasah
Nice, will check. Thanks - born to hula

2 Answers

5
votes

To follow up on @eliasah comment perhaps you can try this:

import org.apache.spark.sql.functions
df_nyc_taxi.repartition(col("year"), col("month"), lit(rand() * 200)).write.partitionBy("year", "month").format("delta").save("/mnt/delta/")

The answer from @eliasah most likely will create only one file for each directory "/mnt/delta/year=XX/month=XX", and only one worker will write the data to each file. The extra columns will further slice the data (in this case I'm dividing the data in each original file to 200 smaller partitions, you can edit it if you like), so that more worker can write concurrently.

P.S: sry I don't have enough rep to comment yet :'D

1
votes

This is similar to the other answer however, I have added a persist after repartition & before writing it. The persist will go into memory and rest(left over after memory is full) will spill to disk, which will still be faster than reading it again. It has worked well in the past for me. I chose 1250 partitions as 128mb is my usual go to for partition size. Spark became what it is because of in-memory computations, therefore it is a best practice to apply it whenever you have the chance.

from pyspark.sql import functions as F
df_nyc_taxi.repartition(1250,F.col("year"), col("month"))\
.persist(StorageLevel.MEMORY_AND_DISK).write.partitionBy("year", "month")\
.format("delta").save("/mnt/delta/")