1
votes

Data Looks Like:

col 1 col 2 col 3 col 4
row 1 row 1 row 1 row 1
row 2 row 2 row 2 row 2
row 3 row 3 row 3 row 3
row 4 row 4 row 4 row 4
row 5 row 5 row 5 row 5
row 6 row 6 row 6 row 6

Problem: I want to partition this data, lets say row 1 and row 2 will be processed as one partition, row 3 and row 4 as another, row 5 and row 6 as another and create a JSON data merging them together with the column (column headers with data values in rows).

Output should be like:
[
{col1:row1,col2:row1:col3:row1:col4:row1},
{col1:row2,col2:row2:col3:row2:col4:row2},
{col1:row3,col2:row3:col3:row3:col4:row3},
{col1:row4,col2:row4:col3:row4:col4:row4},...
]

I tried using repartion(num) available in spark but it is not exactly partitioning as i want. therefore the JSON data generated is not valid. i had issue with why my program was taking same time for processing the data even though i was using different number of cores which can be found here and the repartition suggestion was suggested by @Patrick McGloin . The code mentioned in that problem is something i am trying to do.

1
I don't think you mean partition because you can generate your JSON from a single RDD without particular concern for partitioning. If you know the array position that your keys need to be applied to, you can use scala's zip in an RDD map call.Alister Lee
I have used the zip feature to combine header with row to create json data but incoming data is microbatch and has around 10000 rows. It requires processing in less than 10 mins because of which i need to use more partitions to use more cores. If i use single rdd without partitioning it the task takes around 45 minutes which is not acceptable as incoming data comes every 10 minutes.Prajwol Sangat
If you just need evenly-sized partitions then repartition should get you what you want. Make sure you have enough workers/executors in your cluster to utilise your cores otherwise the partitions will run sequentially anyway. On the other hand, if you need to process a particular subset of the rows together, then I agree with @Lukasz Tracewski below, but you may be able to use groupByKey which would be simpler.Alister Lee

1 Answers

3
votes

Guess what you need is partitionBy. In Scala you can provide to it a custom build HashParitioner, while in Python you pass partitionFunc. There is a number of examples out there in Scala, so let me briefly explain the Python flavour.

partitionFunc expects a tuple, with first element being the key. Lets assume you organise your data in the following fashion: (ROW_ID, (A,B,C,..)) where ROW_ID = [1,2,3,...,k]. You can always add ROW_ID and remove it afterwards.

To get a new partition every two rows:

rdd.partitionBy(numPartitions = int(rdd.count() / 2),
                partitionFunc = lambda key: int(key / 2)

partitionFunc will produce a sequence 0,0,1,1,2,2,... This number will be a number of partition to which given row will belong.