0
votes

I am trying to use PySpark to find the average difference between adjacent list of tuples.

For example if I have a RDD like so

vals = [(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]

I want to find the average difference for each key.

For example for key value "2"

The average difference would be (abs(110-130) + abs(130-120))/2 = 15.

This is my approach so far. I am trying to change the average calculation code to accommodate for this instead. But it doesn't seem to be working.

from pyspark import SparkContext
aTuple = (0,0)
interval = vals.aggregateByKey(aTuple, lambda a,b: (abs(a[0] - b),a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))
finalResult = interval.mapValues(lambda v: (v[0]/v[1])).collect()

I want to do this using the RDD functions, no Spark SQL or any other additional packages.

What would be the best way to do this?

Please let me know if you have any questions.

Thank you for your time.

1

1 Answers

1
votes

I came up with a naive approach to this. I am not sure if this will work in all cases. It goes something like this.

Lets first make a function to calculate the moving average. Please correct me if this is not the correct way to calculate moving average.

def get_abs(num_list):
    '''
    >>> get_abs([110, 130, 120])
    15.0
    '''
    acc = 0
    num_pairs = 0
    for i in range(len(num_list)-1):
        acc += abs(num_list[i]-num_list[i+1])
        num_pairs +=1
    return acc/num_pairs

Next, we parallelize the list

>>> vals = [(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]
>>> rdd = sc.parallelize(vals)
>>> rdd.collect()
[(2, 110),
 (2, 130),
 (2, 120),
 (3, 200),
 (3, 206),
 (3, 206),
 (4, 150),
 (4, 160),
 (4, 170)]

Then, group the values belonging to the same list.

>>> vals = rdd.groupByKey().mapValues(list)
>>> vals.collect()
[(4, [150, 160, 170]), (2, [110, 130, 120]), (3, [200, 206, 206])]

Then we just need to call our function that we defined above to calculate the moving average on the grouped values.

>>> vals.mapValues(get_abs).collect()
[(4, 10.0), (2, 15.0), (3, 3.0)]