1
votes

I have been experimenting with partitions and repartitioning of PySpark RDDs.

I noticed, when repartitioning a small sample RDD from 2 to 6 partitions, that simply a few empty parts are added.

rdd = sc.parallelize([1,2,3,43,54,678], 2)  
rdd.glom().collect()  
>>> [[1, 2, 3], [43, 54, 678]]

rdd6 = rdd.repartition(6)
rdd6.glom().collect()
>>> [[], [1, 2, 3], [], [], [], [43, 54, 678]]

Now, I wonder if that also happens in my real data.

It seems I can't use glom() on larger data (df with 192497 rows).
df.rdd.glom().collect()
Because when I try, nothing happens. It makes sense though, the resulting print would be enormous...

SO

I'd like to print each partition, to check if they are empty. or at least the top 20 elements of each partition.

any ideas?

PS: I found solutions for Spark, but I couldn't get them to work in PySpark...
How to print elements of particular RDD partition in Spark?

btw: if someone can explain to me why I get those empty partitions in the first place, I'd be all ears...
Or how I know when to expect this to happen and how to avoid this.
Or does it simply not influence performance, if there are empty partitions in a dataset?

2

2 Answers

1
votes

Apparently (and surprisingly), rdd.repartition only doing coalesce, so, no shuffling, no wonder why the distribution is unequal. One way to go is using dataframe.repartition

rdd = sc.parallelize([1,2,3,43,54,678], 2)  
rdd.glom().collect()  
>>> [[1, 2, 3], [43, 54, 678]]

rdd6 = rdd.repartition(6)
rdd6.glom().collect()
>>> [[], [1, 2, 3], [], [], [], [43, 54, 678]]

rdd6_df = spark.createDataFrame(rdd, T.IntegerType()).repartition(6).rdd
rdd6_df.glom().collect()
[[Row(value=678)],
 [Row(value=3)],
 [Row(value=2)],
 [Row(value=1)],
 [Row(value=43)],
 [Row(value=54)]]
0
votes

concerning the possibility to check if partitions are empty, I came across a few solutions myself:

  1. (if there aren't that many partitions)
rdd.glom().collect()
>>>nothing happens

rdd.glom().collect()[1] 
>>>[1, 2, 3]

Careful though, it will truly print the whole partition. For my data it resulted in a few thousand lines of print. but it worked!

source: How to print elements of particular RDD partition in Spark?

  1. count lines in each partition and show smallest/largest number.
l = df.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()

min(l,key=lambda item:item[1])
>>>(2, 61705)

max(l,key=lambda item:item[1])
>>>(0, 65875)

source: Spark Dataframes: Skewed Partition after Join