Beam uses PCollection
, a parallel collection which in Python you can consider to be like (usually) a list of elements ( usually tuples or dicts ).
In your case it might be a list of "row", so you would
- extract the key of the row. If that were
User_id
then map something like this lambda
e.g.
x -> (x[0], x)
Note that x is used as the value in the k,v pair, and it also still contains the key, but that's fine, if you wanted you could remove it and repack a value tuple without it. i.e. this tuple returned will be similar to the type Tuple[str, Tuple[int, int, int, float, float]], assuming those are the correct types of User_id,product_id,year,quantity,price
- apply a window
group by key (very important to define window before grouping by key, and know the window only takes effect when the group by key happens)
use something to extract the columns (the values in the tuple) you are interested in, apply the aggregates, and repack for whatever is downstream.
It seems weird to use an aggregate function on a single tuple value, but the aggregation will be mapped/applied across the entire key-group in the window.
This basic example can be extended easily https://github.com/apache/beam/blob/ee96f66e14866f9642e9c67bf2ef231be7e7d55b/sdks/python/apache_beam/examples/wordcount.py#L99
If you need to do something simple, just map a function, if you need more than simple you can create a DoFn. This one is simple.
e.g. !warning, untested code written in transit!
def multi_agg(element):
(key, row ) = element
return (key, (max(row[3]), min(row[4])))
In this case I've taken the user_id as the key from the previous step, and max of quantity and min of price, then packed it back into a tuple of k,v pairs. The k,v pair is a tuple, which is an element of the downstream PCollection. The main reason you need k,v pairs is because things like GroupByKey
implicitly use the first value as the key to group on. The entire element is implicitly used as the value to map to the function. Those two things are not obvious when looking at the Apache Beam examples.
You could either repack into a k,v pair for further downstream processing, or put into a structure ready to write to e.g. bigquery or bigtable or perhaps a file in a cloud storage bucket. It's a great idea to use the type hints in any case.