1
votes

I'm using Apache beam python SDK and working on GCP dataflow. How do I apply aggregate functions on multiple columns based on a single key? For example, a dataset of 10 columns where my data looks like User_id,product_id,year,quantity,price,... 101,1,2018,10,15,... 101,2,2019,1,10,... 102,1,2019,2,16,...

For each user id How do I calculate the number of distinct products he has purchased, max(quantity), min(price) etc..

I have seen examples of wordcount etc where you can apply sum on the values in (key,value) pair. What if i want to do different transformations on different columns like sum/mean/count etc.

1

1 Answers

0
votes

Beam uses PCollection , a parallel collection which in Python you can consider to be like (usually) a list of elements ( usually tuples or dicts ).

In your case it might be a list of "row", so you would

  1. extract the key of the row. If that were User_id then map something like this lambda e.g.

x -> (x[0], x)

Note that x is used as the value in the k,v pair, and it also still contains the key, but that's fine, if you wanted you could remove it and repack a value tuple without it. i.e. this tuple returned will be similar to the type Tuple[str, Tuple[int, int, int, float, float]], assuming those are the correct types of User_id,product_id,year,quantity,price

  1. apply a window
  2. group by key (very important to define window before grouping by key, and know the window only takes effect when the group by key happens)

  3. use something to extract the columns (the values in the tuple) you are interested in, apply the aggregates, and repack for whatever is downstream.

It seems weird to use an aggregate function on a single tuple value, but the aggregation will be mapped/applied across the entire key-group in the window.

This basic example can be extended easily https://github.com/apache/beam/blob/ee96f66e14866f9642e9c67bf2ef231be7e7d55b/sdks/python/apache_beam/examples/wordcount.py#L99

If you need to do something simple, just map a function, if you need more than simple you can create a DoFn. This one is simple.

e.g. !warning, untested code written in transit!

def multi_agg(element):
    (key, row ) = element
    return (key, (max(row[3]), min(row[4])))

In this case I've taken the user_id as the key from the previous step, and max of quantity and min of price, then packed it back into a tuple of k,v pairs. The k,v pair is a tuple, which is an element of the downstream PCollection. The main reason you need k,v pairs is because things like GroupByKey implicitly use the first value as the key to group on. The entire element is implicitly used as the value to map to the function. Those two things are not obvious when looking at the Apache Beam examples.

You could either repack into a k,v pair for further downstream processing, or put into a structure ready to write to e.g. bigquery or bigtable or perhaps a file in a cloud storage bucket. It's a great idea to use the type hints in any case.