3
votes

I have a parquet dataframe, with the following structure:

  1. ID String
  2. DATE Date
  3. 480 other feature columns of type Double

I have to replace each of the 480 feature columns with their corresponding weighted moving averages, with a window of 250. Initially, I am trying to do this for a single column, with the following simple code:

var data = sparkSession.read.parquet("s3://data-location")
var window = Window.rowsBetween(-250, Window.currentRow - 1).partitionBy("ID").orderBy("DATE")
data.withColumn("Feature_1", col("Feature_1").divide(avg("Feature_1").over(window))).write.parquet("s3://data-out")

The input data contains 20 Million rows, and each ID has about 4-5000 dates associated. I have run this on an AWS EMR cluster(m4.xlarge instances), with the following results for one column:

  • 4 executors X 4 cores X 10 GB + 1 GB for yarn overhead (so 2.5GB per task, 16 concurrent running tasks) , took 14 min
  • 8 executors X 4 cores X 10GB + 1 GB for yarn overhead (so 2.5GB per task, 32 concurrent running tasks), took 8 minutes

I have tweaked the following settings, with the hope of bringing the total time down:

  • spark.memory.storageFraction 0.02
  • spark.sql.windowExec.buffer.in.memory.threshold 100000
  • spark.sql.constraintPropagation.enabled false

The second one helped prevent some spilling seen in the logs, but none helped with the actual performance.

I do not understand why it takes so long for just 20 Million records. I know that for computing weighted moving average, it needs to do 20 M X 250 (the window size) averages and divisions, but with 16 cores (first run) I don't see why it would take so long. I can't imagine how long it would take for the rest of the 479 remaining feature columns!

I have also tried increasing the default shuffle paritions, by setting:

  • spark.sql.shuffle.partitions 1000

but even with 1000 partitions, it didn't bring the time down. Also tried sorting the data by ID and DATE before calling the window aggregations, without any benefit.

Is there any way to improve this, or window functions generally run slow with my usecase? This is 20M rows only, nowhere near what spark can process with other types of workload..

1

1 Answers

0
votes

Your dataset size is approximately 70 GB. if i understood it correctly for each id it is sorting on date for all the records and then taking the preceding 250 records to do average. As you need to apply this on more than 400 columns, i would recommend trying bucketing while parquet creation to avoid shuffling. it takes considerable amount of time for writing the bucketted parquet file but for all 480 columns derivation that may not take 8 minutes *480 executing time.

please try bucketing or repartition and sortwithin while creating parquet file and let me know if it works.