1
votes

I have a dataframe with next schema:

root
 |-- id_1: long (nullable = true)
 |-- id_2: long (nullable = true)
 |-- score: double (nullable = true)

The data looks like :

+----+----+------------------+
|id_1|id_2|score             |
+----+----+------------------+
|0   |9   |0.5888888888888889|
|0   |1   |0.6166666666666667|
|0   |2   |0.496996996996997 |
|1   |9   |0.6222222222222221|
|1   |6   |0.9082996632996633|
|1   |5   |0.5927450980392157|
|2   |3   |0.665774107440774 |
|3   |8   |0.6872367465504721|
|3   |8   |0.6872367465504721|
|5   |6   |0.5365909090909091|
+----+----+------------------+

The goal is to find id_2 for each id_1 with max score. Maybe I'm wrong, but... just need to create paired RDD:

root
 |-- _1: long (nullable = true)
 |-- _2: struct (nullable = true)
 |    |-- _1: long (nullable = true)
 |    |-- _2: double (nullable = true)

+---+----------------------+
|_1 |_2                    |
+---+----------------------+
|0  |[9,0.5888888888888889]|
|0  |[1,0.6166666666666667]|
|0  |[2,0.496996996996997] |
|1  |[9,0.6222222222222221]|
|1  |[6,0.9082996632996633]|
|1  |[5,0.5927450980392157]|
|2  |[3,0.665774107440774] |
|3  |[8,0.6872367465504721]|
|3  |[8,0.6872367465504721]|
|5  |[6,0.5365909090909091]|
+---+----------------------+

and reduce by key with max. Something like

paired_rdd.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1]))

Or the same with DataFrame API (without paired rdd):

original_df.groupBy('id_1').max('score')

I have two questions and will be appreciated if someone can point on my wrong steps.

  1. For 1 billion or even 100 billion records: what are best practices to achieve the goal (find id_2 for each id_1 with max score)? I've tried with 50 million and 100M records and had better results with shuffling the data (which is opposite to what Holden Karau told). I've done repartitioning by id_1

    .repartition(X, "id_1")

    then reduceByKey and it was faster. Why?

  2. Why DataFrame API was slower several times vs RDD API? Where I'm wrong?

Thanks.

2
how is original_df partitioned?Raphael Roth
@RaphaelRoth it depends on amount of cores, I believe. (yes, rule of thumb is 128MB per partition but it's not text data and not consuming much memory) I've tried on 30 cores and used 30 and it was better timing. With 60, 120 - was worse, with less than 30 - worse too, cause not all cores was loaded.SirJ
maybe it's not the RDD API which is faster but just the fact that you repartitionied the data. If your original dataframe has too few partitions, then the groupBy of the dataframe API will not make proper use of your cluster resources (because you don't have enough parallelism)Raphael Roth
@RaphaelRoth All executors/cores was busy so I believe, parallelism was enough. How can I check it? Will create screens with Spark UI soon with same data but different amount of partitions and RDD vs DataFrame. here's a link on youtube with timing youtu.be/V6DkTVvy9vk?t=20m26s where barchart of RDD vs DataFrame speed.SirJ

2 Answers

4
votes

Your use case is a perfect use case for a window aggregate functions. Give them a try and see how it compares to RDD's reduceByKey.


It's sometimes not about whether a RDD-based pipeline is faster than a DataFrame-based one, but how expressive one is versus the other. It is almost always that a DataFrame-based pipeline is more expressive (and perhaps more maintainable in a long run) than a RDD-based alternative.


(I'm using Scala and leave converting the code to Python as a home exercise)

scala> dataset.show
+----+----+------------------+
|id_1|id_2|             score|
+----+----+------------------+
|   0|   9|0.5888888888888889|
|   0|   1|0.6166666666666667|
|   0|   2| 0.496996996996997|
|   1|   9|0.6222222222222221|
|   1|   6|0.9082996632996633|
|   1|   5|0.5927450980392157|
|   2|   3| 0.665774107440774|
|   3|   8|0.6872367465504721|
|   3|   8|0.6872367465504721|
|   5|   6|0.5365909090909091|
+----+----+------------------+

import org.apache.spark.sql.expressions.Window
val byId_1 = Window.partitionBy("id_1")
original_df.select($"id_1", max() over byId_1)
scala> dataset.
  select($"id_1", $"id_2", $"score", max("score") over byId_1 as "max_score").
  filter($"score" === $"max_score").
  distinct.  // <-- id_1 == 3 is duplicated
  sort("id_1").
  show
+----+----+------------------+------------------+
|id_1|id_2|             score|         max_score|
+----+----+------------------+------------------+
|   0|   1|0.6166666666666667|0.6166666666666667|
|   1|   6|0.9082996632996633|0.9082996632996633|
|   2|   3| 0.665774107440774| 0.665774107440774|
|   3|   8|0.6872367465504721|0.6872367465504721|
|   5|   6|0.5365909090909091|0.5365909090909091|
+----+----+------------------+------------------+

Please note that by default DataFrames use spark.sql.shuffle.partitions which is 200 and I had a case last week where most of the partitions (and hence tasks) where empty that led to thousands of tasks waiting for execution that was for nothing and burnt CPU cycles. We went from hours to seconds.

Knowing your data and how it should be partitioned is the very first step in optimizing your Spark queries be it written using RDD API or Dataset API.

2
votes

Thanks Jacek for interesting suggestion.

I've performed some tests on 4 * c4.8xlarge server (128 cores, 192GB RAM, I hope 32 workers and partitioning=128 is good for this setup). Used dataset with 1,368,598,093 records.

  1. "Window" solution - about 43min and generated about 31GB shuffle (15.4GB shuffle write and 15.4GB shuffle read then). See stage #25. enter image description here
  2. Solution with reduceByKey without repartitioning by id - 40min and 8.4MB shuffle (4.2MB shuffle write and 4.2MB shuffle read then) See the stage #22 enter image description here
  3. And the winner - reduceByKey with repartitioning by id. 22min and 15GB shuffle (7.5GB shuffle write and 7.5GB shuffle read then) See the stage #24 enter image description here

I believe if I will process 200B records shuffle will cause some IO troubles and better to not use repartition by some column (because of shuffle) but I don't know how to improve speed without it. Unfortunately StackOverflow cannot give me the correct answer. :(

Thank you guys for interesting suggestions!