I'm trying to pre-partition the data before doing an aggregation operation across a certain column of my data. I have 3 worker nodes and I would llike each partition to have non-overlapping values in the column I am partitioning on. I don't want to have situations where two partitions might have the same values in the column.
e.g. If I have the following data
ss_item_sk | ss_quantity
1 | 10.0
1 | 4.0
2 | 3.0
3 | 5.0
4 | 8.0
5 | 13.0
5 | 10.0
Then the following partitions are satisfactory:
partition 1
ss_item_sk | ss_quantity
1 | 10.0
1 | 4.0
partition 2
ss_item_sk | ss_quantity
2 | 3.0
3 | 5.0
partition 3
ss_item_sk | ss_quantity
4 | 8.0
5 | 13.0
5 | 10.0
Unfortunately, the code I have below does not work.
spark.sqlContext.setConf( "spark.sql.shuffle.partitions", "3")
var json = spark.read.json("hdfs://master:9000/tpcds/store_sales")
var filtered = json.filter(row => row.getAs[Long]("ss_item_sk") < 180)
filtered.repartition($"ss_item_sk").write.json(savepath)
I have already looked at
- How to define partitioning of DataFrame?
- Spark SQL - Difference between df.repartition and DataFrameWriter partitionBy?
- pyspark: Efficiently have partitionBy write to same number of total partitions as original table
and I still can't figure it out.
filtered.write.partitionBy($"ss_item_sk").write.json(savepath)
. The outcome would be you'll have folders for each different value ofss_item_sk
and within each folder will only be row pertaining to thatss_item_sk
value. – kaysush