0
votes

I'm trying to pre-partition the data before doing an aggregation operation across a certain column of my data. I have 3 worker nodes and I would llike each partition to have non-overlapping values in the column I am partitioning on. I don't want to have situations where two partitions might have the same values in the column.

e.g. If I have the following data

ss_item_sk | ss_quantity
1          | 10.0
1          |  4.0
2          |  3.0
3          |  5.0
4          |  8.0
5          |  13.0
5          |  10.0

Then the following partitions are satisfactory:

partition 1

ss_item_sk | ss_quantity
1          | 10.0
1          |  4.0

partition 2

ss_item_sk | ss_quantity
2          |  3.0
3          |  5.0

partition 3

ss_item_sk | ss_quantity
4          |  8.0
5          |  13.0
5          |  10.0

Unfortunately, the code I have below does not work.

spark.sqlContext.setConf( "spark.sql.shuffle.partitions", "3")
var json = spark.read.json("hdfs://master:9000/tpcds/store_sales")
var filtered = json.filter(row => row.getAs[Long]("ss_item_sk") < 180)
filtered.repartition($"ss_item_sk").write.json(savepath)

I have already looked at

and I still can't figure it out.

1
Try using filtered.write.partitionBy($"ss_item_sk").write.json(savepath). The outcome would be you'll have folders for each different value of ss_item_sk and within each folder will only be row pertaining to that ss_item_sk value.kaysush
@kaysush - why won’t ‘repartition’ work here? As I understood, ‘repartition’ will partition on the basis of key ‘ss_item_sk’ so that same keys end on the same partition. Do you know why ‘repartition()’ is not working for Anny?cph_sto
What is not working? Pre- or post i.e. as write to file? Did u try explain? Your example of satisfactory means also that 1,5 2 3,4 would also be ok I assume?thebluephantom

1 Answers

0
votes

Repartition by key does an distribution of data based on a key in dataframe level. While writing a dataframe on hdfs is a separate thing. you can try

df.coalesce(1).write.partitionBy("ss_item_sk").json(savepath)

In this scenario as well you will see multiple part files in different directory created by partitioned column. The number of writer/reducer that will run can only be controlled based on "partitionBy" method. Its very similar like Map Reduce Partitioner as it controls the number of reducer will run. To get a single file based on the partition column you have to run this command.

df.repartition($"ss_item_sk").write.partitionBy("ss_item_sk").json(savepath)

Now this works as the reducer is getting mapped with the number of executor partition. Hope this helps