0
votes

I am working with an RDD of x: key, y: set(values) called file.

#values: RDD of tuples (key, val)    
file = values.groupByKey().mapValues(set).cache()
info_file = array(file.map(lambda (x,y): len(y)).collect())
var = np.var(info_file) #extremely high
def f():
     ...
file.foreachPartition(f)

The variance of len(y) is extremely high, such that around 1% of the pairs' sets (verified with percentile method) makes the 20% of the total number of values in the sets total = np.sum(info_file). If Spark partitions randomly with shuffle, there're high chances that that 1% may fall in the same partition, making unbalanced loads among workers.

Is there a way to make sure that 'heavy' tuples are euqally distributed among partitions? I actually split the file in two partitions, heavy and light, based on a threshold value of len(y) given by threshold = np.percentile(info_file,99.9) in order to separate this group of tuples and then re-partition.

light = file.filter(lambda (x,y): len(y) < threshold).cache()
heavy = file.filter(lambda (x,y): len(y) >= threshold).cache()

light.foreachPartition(f)
heavy.foreachPartition(f)

but get almost the same running time. The load might be already optimized, just want to check if I can do something more / better.

1

1 Answers

1
votes

You can use Ganglia to monitor cluster load. That should give you a very good indication of any data skew that might cause uneven cluster load.

If you do have an unfortunate data skew, there are ways of battling it by for instance restructuring data or salting keys. See for instance this StackOverflow Q&A.

Note that you can also do, what you are doing now, with splitting the data into heavy partitions and light partitions, but in that case you want to cache the file - not heavy and light - since it is file you need to process multiple times. Like this:

cachedFile = file.cache()

light = cachedFile.filter(lambda (x,y): len(y) < threshold)
heavy = cachedFile.filter(lambda (x,y): len(y) >= threshold)

light.foreachPartition(f)
heavy.foreachPartition(f)

Hope it helps.