
I'm trying to pre-partition the data before doing an aggregation operation across a certain column of my data. I have 3 worker nodes and I would llike each partition to have non-overlapping values in the column I am partitioning on. I don't want to have situations where two partitions might have the same values in the column.

e.g. If I have the following data

ss_item_sk | ss_quantity
1          | 10.0
1          |  4.0
2          |  3.0
3          |  5.0
4          |  8.0
5          |  13.0
5          |  10.0

Then the following partitions are satisfactory:

partition 1

ss_item_sk | ss_quantity
1          | 10.0
1          |  4.0

partition 2

ss_item_sk | ss_quantity
2          |  3.0
3          |  5.0

partition 3

ss_item_sk | ss_quantity
4          |  8.0
5          |  13.0
5          |  10.0

Unfortunately, the code I have below does not work.

spark.sqlContext.setConf( "spark.sql.shuffle.partitions", "3")
var json = spark.read.json("hdfs://master:9000/tpcds/store_sales")
var filtered = json.filter(row => row.getAs[Long]("ss_item_sk") < 180)

I have already looked at

and I still can't figure it out.

Try using filtered.write.partitionBy($"ss_item_sk").write.json(savepath). The outcome would be you'll have folders for each different value of ss_item_sk and within each folder will only be row pertaining to that ss_item_sk value.kaysush
@kaysush - why won’t ‘repartition’ work here? As I understood, ‘repartition’ will partition on the basis of key ‘ss_item_sk’ so that same keys end on the same partition. Do you know why ‘repartition()’ is not working for Anny?cph_sto
What is not working? Pre- or post i.e. as write to file? Did u try explain? Your example of satisfactory means also that 1,5 2 3,4 would also be ok I assume?thebluephantom

1 Answers


Repartition by key does an distribution of data based on a key in dataframe level. While writing a dataframe on hdfs is a separate thing. you can try


In this scenario as well you will see multiple part files in different directory created by partitioned column. The number of writer/reducer that will run can only be controlled based on "partitionBy" method. Its very similar like Map Reduce Partitioner as it controls the number of reducer will run. To get a single file based on the partition column you have to run this command.


Now this works as the reducer is getting mapped with the number of executor partition. Hope this helps