I am asking this question because if I specify repartition as 5, than all my data(>200Gigs) are moved to 5 different executors and 98% of the resources is unused. and then the partitionBy is happening which is again creating a lot of shuffle. Is there a way that first the partitionBy happens and then repartition runs on the data?
2 Answers
Although the question is not entirely easy to follow, the following aligns with the other answer and this approach should avoid the issues mentioned on unnecessary shuffling:
val n = [... some calculation for number of partitions / executors based on cluster config and volume of data to process ...]
df.repartition(n, $"field_1", $"field_2", ...)
.sortWithinPartitions("fieldx", "field_y")
.write.partitionBy("field_1", "field_2", ...)
.format("location")
whereby [field_1, field_2, ...] are the same set of fields for repartition and partitionBy.
You can use repartition(5, col("$colName"))
.
Thus when you will make partitionBy("$colName")
you will skip shuffle for '$colName'
since it's already been repartitioned.
Also consider to have as many partitions as the product of the number of executors by the number of used cores by 3 (this may vary between 2 and 4 though).
So as we know, Spark can only run 1 concurrent task for every partition of an RDD. Assuming you have 8 cores per executor and 5 executors:
You need to have: 8 * 5 * 3 = 120 partitions