3
votes

I have a large dataset in parquet format (~1TB in size) that is partitioned into 2 hierarchies: CLASS and DATE There are only 7 classes. But the Date is ever increasing from 2020-01-01 onwards. My data is partitioned by CLASS first and then DATE

So something like:

CLASS1---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

CLASS2---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

I load my data by CLASS in a for-loop. If I load the entire parquet file, YARN kills the job since it overloads the memory instances. But I load all the days since I am doing a percentile calculation in my modeling. This method takes about 23hrs to complete.

However, if I repartition such that I only have the CLASS partition, the job takes about 10hrs. Does having too many sub-partitions slow down the spark executor jobs? I keep the partition hierarchy as CLASS -> DATE only because I need to append new data by DATE every day. If having only 1 partition is more efficient, then I would have to repartition to just the CLASS partition every day after loading new data. Could someone explain why having a single partition works faster? And if so, what would be the best method to partition the data on a daily basis by appending and without repartitioning the entire dataset?

Thank You

EDIT: I use the for loop on the file structure to loop by CLASS partition like so:

fs = s3fs.S3FileSystem(anon=False)    
inpath="s3://bucket/file.parquet/"

Dirs= fs.ls(inpath)
for paths in Dirs:
    customPath='s3://' + uvapath + '/'
    class=uvapath.split('=')[1]
    df=spark.read.parquet(customPath)
    outpath="s3://bucket/Output_" + class + ".parquet"
#Perform calculations
df.write.mode('overwrite').parquet(outpath)

The loaded df will have all the dates for CLASS=1. I then output the file as separate parquet files for each CLASS such that I have 7 parquet files:

Output_1.parquet
Output_2.parquet
Output_3.parquet
Output_4.parquet
Output_5.parquet
Output_6.parquet
Output_7.parquet

I then merge the 7 parquets into a single parquet is not a problem as the resulting parquet files are much smaller.

1
is your partition structure not organized in a way that CLASS=1/DATE=2020-01-01? Then this will be tough. - Lamanus
This is not the single, multiple partition problem. Even there are many partitions, and if you just load the whole, it should be OK when you only filter some portion of the partition and used it to proceed your task, that should not cause the OOM or job killed. - Lamanus
@Lamanus yes it is in such a way. But it seems to be taking a longer time to do the same job than if I have just CLASS=1 - thentangler
You do not need the for-loop to load that, but just load the root path. BTW, the partition column should be lower case. - Lamanus

1 Answers

2
votes

I have the partitioned data with three columns, year, month, and id. The folder path hierarchy is

year=2020/month=08/id=1/*.parquet
year=2020/month=08/id=2/*.parquet
year=2020/month=08/id=3/*.parquet
...
year=2020/month=09/id=1/*.parquet
year=2020/month=09/id=2/*.parquet
year=2020/month=09/id=3/*.parquet

and I can read the DataFrame by loading the root path.

val df = spark.read.parquet("s3://mybucket/")

Then, the partitioned column is automatically added to the DataFrame. Now, then you can filter your data for the partitioned column in a way that

val df_filtered = df.filter("year = '2020' and month = '09'")

and do something with df_filtered then the spark will use only the partitioned data!


For your repeated processing, you can use the fair scheduler of the spark. Add the fair.xml file into src/main/resources of your project with the below code,

<?xml version="1.0"?>

<allocations>
    <pool name="fair">
        <schedulingMode>FAIR</schedulingMode>
        <weight>10</weight>
        <minShare>0</minShare>
    </pool>
</allocations>

and set the spark configuration after creating the spark session.

spark.sparkContext.setLocalProperty("spark.scheduler.mode", "FAIR")
spark.sparkContext.setLocalProperty("spark.scheduler.allocation.file", getClass.getResource("/fair.xml").getPath)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "fair")

Then you can do your job in parallel. You may want to parallelize the job depends on the CLASS, so

val classes = (1 to 7).par
val date = '2020-09-25'

classes foreach { case i =>

    val df_filtered = df.filter(s"CLASS == '$i' and DATE = '$date'")
    
    // Do your job

}

the code will work at the same time with different CLASS values.