Is there a relationship between the Partitions of an RDD and the Buckets which the contents of the RDD get mapped to before a shuffle operation ?
If you ask about bucketed tables (after bucketBy and spark.table("bucketed_table")) I think the answer is yes.
Let me show you what I mean by answering yes.
val large = spark.range(1000000)
scala> println(large.queryExecution.toRdd.getNumPartitions)
8
scala> large.write.bucketBy(4, "id").saveAsTable("bucketed_4_id")
18/04/18 22:00:58 WARN HiveExternalCatalog: Persisting bucketed data source table `default`.`bucketed_4_id` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
scala> println(spark.table("bucketed_4_id").queryExecution.toRdd.getNumPartitions)
4
In other words, the number of partitions (after a bucketed table is loaded) is exactly the number of buckets (you defined while saving it).
Secondly, will all key value pairs with same key be shuffled to the same bucket or is the distribution of key value pairs to buckets random?
Spark 2.3 (and I believe the earlier versions work alike) does bucketing per partition (a writer task), i.e. every partition has the number of buckets you defined.
In the above case, you will end up with 8 (partitions) x 4 (buckets) = 32 bucket files (that with two extra lines for _SUCCESS and the header gives 34).
$ ls -ltr spark-warehouse/bucketed_4_id | wc -l
34
Does specifying a partitioner (hash/range) have any effect on this distribution?
I think so since a partitioner is used to distribute data across partitions.
HashShuffleWriterimplementation you'll see there is pretty much no distinction.bucketIdis determined usingpartitioner.getPartitionon thekey. - zero323records: Iterator[Product2[K, V]]that is passed to thewrite()method of theHashShuffleWritercontains the actual content of the buckets ? - jaywalker