0
votes

Imagine I have a huge dataset which I partitionBy('id'). Assume that id is unique to a person, so there could be n number of rows per id and the goal is to reduce it to one. Basically, aggregating to make id distinct.

w = Window().partitionBy(id).rowsBetween(-sys.maxsize, sys.maxsize)

test1 = {
    key: F.first(key, True).over(w).alias(key)
    for key in some_dict.keys()
    if (some_dict[key] == 'test1')
}
test2 = {
    key: F.last(key, True).over(w).alias(k)
    for k in some_dict.keys()
    if (some_dict[k] == 'test2')
}

Assume that I have some_dict with values either as test1 or test2 and based on the value, I either take the first or last as shown above.

How do I actually call aggregate and reduce this?

 cols = {**test1, **test2}
 cols = list(cols.value())
 df.select(*cols).groupBy('id').agg(*cols) # Doesnt work

The above clearly doesn't work. Any ideas? Goal here is : I have 5 unique IDs and 25 rows with each ID having 5 rows. I want to reduce it to 5 rows from 25.

1
what is your sample data? and what is the order by column? - Lamanus
Order by date, but that is not the point here. Its mostly on how to aggregate it. - druuu
The window function is not for aggregate, you should use group by for your purpose. Why do you mixing them? - Lamanus
I agree, using window function for partitioning. Imagine data being extremely big. The above example is clear right? - druuu

1 Answers

1
votes

Let assume you dataframe name df which contains duplicate use below method

from pyspark.sql.functions import row_number 
from pyspark.sql.window import Window 
window = Window.partitionBy(df['id']).orderBy(df['id'])

final = df.withColumn("row_id", row_number.over(window)).filter("row_id = 1")
final.show(10,False) 

change the order by condition in case there is specific criteria so that particular record will be on top of partition