I don't think the HashPartitioner is going to put records with the same key to two different partitions in any situation. The javadoc for partitioner clearly says the following:
An object that defines how the elements in a key-value pair RDD are
partitioned by key. Maps each key to a partition ID, from 0 to
numPartitions - 1.
Note that, partitioner must be deterministic, i.e. it must return the
same partition id given the same partition key.
If putting the records with same keys into the same partition is not a requirement for you, maybe you can try the following without implementing a custom partitioner.
- Let's say you want to write the dataframe into 1000 files.
- Add a new column to your dataframe with random integers between 0 to 999.
_num_output_files = 1000
df = df.withColumn('rand', round(rand() * (_num_output_files-1), 0).astype(IntegerType()))
WLG, let's Assume the rand column is your i-th column in the dataframe. We need to use that column as key for the rdd, and then partition by that key. This will ensure almost uniform distribution of data across all partitions. Following code snippet will achieve that.
tmp_rdd = df.rdd.keyBy(lambda x: x[i-1])
tmp_rdd = tmp_rdd.partitionBy(_num_output_files, lambda x: x)
df_rdd = spark.createDataFrame(tmp_rdd.map(lambda x: x[1]))
Note: This is a handy code snippet to check the current distribution of records across partitions in Pyspark: print('partition distrib: ' + str(df_rdd.rdd.glom().map(len).collect())). After calling the previous set of methods you should see roughly the same numbers in each of the partition.