0
votes

I have a very large dataframe around 2TB in size. There are 2 columns by which I can partition them: MODULE and DATE If I partition them by MODULE each module can have the same dates for example MODULE A might have dates 2020-07-01 , 2020-07-02 and MODULE B might have 2020-07-01 , 2020-07-05 etc. I need to first partition them by MODULE and do some aggregations and joins before I finally partition and store them by DATE. I am using pyspark to code.

After I do the aggregation and joins by MODULE, I am appending it to a parquet file and loading the entire parquet file to a data frame and then partitioning it by DATE. The problem is that the spark job terminates due to memory issues. Can I partition by date directly when in MODULE partition? So the partition would look something like this: Input format: s3://path/MODULE=A --> s3://path/DATE=2020-07-01 where both modules A & B are present in partition DATE=2020-07-01 ?

This was my original code which failed due to very long times in the cluster and running out of memory:

inpath="s3://path/file/"
outpath="s3://path/file_tmp.parquet"
fs = s3fs.S3FileSystem(anon=False)
uvaDirs = fs.ls(inpath)

#Load Data by Module
for uvapath in uvaDirs:
    customPath='s3://' + uvapath + '/'
    df1=spark.read.parquet(customPath)
    #Perform aggregations and joins
    df1.write.mode('append').parquet(outpath)
    
# Load - partition by date
df2=spark.read.parquet("s3://path/file_tmp.parquet")
df2.write.mode('overwrite').partitionBy("DATE").parquet("s3://path/final.parquet")

It successfully creates the file_tmp.parquet but fails when loading and partitioning by date. Any help would be greatly appreciated! Thank you

1

1 Answers

1
votes

like delta datasource can do this, delta store as parquet

(spark.read
 .format("delta")
 .load(path)
 .where(partition)
 .repartition(numFilesPerPartition)
 .write
 .option("dataChange", "false")
 .format("delta")
 .mode("overwrite")
 .option("replaceWhere", partition)
 .save(path))

// clean old file
val deltaTable = DeltaTable.forPath(spark, tablePath)
deltaTable.vacuum(0)

refer: https://docs.delta.io/latest/best-practices.html#-delta-compact-files&language-python