1
votes

I'm trying to understand how Spark partitions data. Suppose I have an execution DAG like that in the picture (orange boxes are the stages). The two groupBy and the join operations are supposed to be very heavy if the RDD's are not partitioned.

enter image description here Is it wise then to use .partitonBy(new HashPartitioner(properValue)) to P1, P2, P3 and P4 to avoid shuffle? What's the cost of partitioning an existing RDD? When isn't proper to partition an existing RDD? Doesn't Spark partition my data automatically if I don't specify a partitioner?

Thank you

2

2 Answers

8
votes

tl;dr The answers to your questions respectively: Better to partition at the outset if you can; Probably less than not partitioning; Your RDD is partitioned one way or another anyway; Yes.

This is a pretty broad question. It takes up a good portion of our course! But let's try to address as much about partitioning as possible without writing a novel.

As you know, the primary reason to use a tool like Spark is because you have too much data to analyze on one machine without having the fan sound like a jet engine. The data get distributed among all the cores on all the machines in your cluster, so yes, there is a default partitioning--according to the data. Remember that the data are distributed already at rest (in HDFS, HBase, etc.), so Spark just partitions according to the same strategy by default to keep the data on the machines where they already are--with the default number of partitions equal to the number of cores on the cluster. You can override this default number by configuring spark.default.parallelism, and you want this number to be 2-3 per core per machine.

However, typically you want data that belong together (for example, data with the same key, where HashPartitioner would apply) to be in the same partition, regardless of where they are to start, for the sake of your analytics and to minimize shuffle later. Spark also offers a RangePartitioner, or you can roll your own for your needs fairly easily. But you are right that there is an upfront shuffle cost to go from default partitioning to custom partitioning; it's almost always worth it.

It is generally wise to partition at the outset (rather than delay the inevitable with partitionBy) and then repartition if needed later. Later on you may choose to coalesce even, which causes an intermediate shuffle, to reduce the number of partitions and potentially leave some machines and cores idle because the gain in network IO (after that upfront cost) is greater than the loss of CPU power.

(The only situation I can think of where you don't partition at the outset--because you can't--is when your data source is a compressed file.)

Note also that you can preserve partitions during a map transformation with mapPartitions and mapPartitionsWithIndex.

Finally, keep in mind that as you experiment with your analytics while you work your way up to scale, there are diagnostic capabilities you can use:

  • toDebugString to see the lineage of RDDs
  • getNumPartitions to, shockingly, get the number of partitions
  • glom to see clearly how your data are partitioned

And if you pardon the shameless plug, these are the kinds of things we discuss in Analytics with Apache Spark. We hope to have an online version soon.

1
votes

By applying partitionBy preemptively you don't avoid the shuffle. You just push it in another place. This can be a good idea if partitioned RDD is reused multiple times, but you gain nothing for a one-off join.

Doesn't Spark partition my data automatically if I don't specify a partitioner?

It will partition (a.k.a. shuffle) your data a part of the join) and subsequent groupBy (unless you keep the same key and use transformation which preserves partitioning).