0
votes

Is it possible to apply a stateful transformation to only the values in a keyed PCollection?

For sake of example, let's say this PCollection is keyed on zip codes. The values are dictionaries that contain a user_id key. In this stateful DoFn, I want to keep track of all of the user_ids I have seen per zip code. However, given the sheer amount of zip codes, it becomes intractable to store all zip code, user_id pairs in state. However, if I only apply this stateful DoFn per key, then I don't need to explicitly store the zip code in state.

From the Python documentation, it doesn't look like this is possible. Would the best way be to abuse a custom CombineFn?

Thanks!

1
Can you attach to your question the code example of the current DoFn implementation?Nick_Kh

1 Answers

1
votes

I think what you want is a CombinePerKey. It applies a CombineFn just for the values or each key.

Also, it's important to have in consideration the Reduce phase when using Combine.

Hope this example helps you. (Added the print so you can see the Reduce phase and why the ifs)

with beam.Pipeline(options=pipeline_options) as p:

    keyed_elements = [
        (47001, {"user_id": 1, "fake_key":"fake_value"}),
        (47001, {"user_id": 2, "fake_key": "fake_value"}),
        (47002, {"user_id": 3, "fake_key": "fake_value"}),
        (47002, {"user_id": 4, "fake_key": "fake_value"}),
        (47003, {"user_id": 5, "fake_key": "fake_value"}),
        (47001, {"user_id": 6, "fake_key": "fake_value"}),
        (47001, {"user_id": 7, "fake_key": "fake_value"}),
        (47001, {"user_id": 8, "fake_key": "fake_value"}),
        (47001, {"user_id": 9, "fake_key": "fake_value"}),
        (47001, {"user_id": 10, "fake_key": "fake_value"}),
        (47001, {"user_id": 11, "fake_key": "fake_value"}),
    ]

    def group_users(elements_values):

        #to test paralellism in reduce phase
        print(f"ELEMENT: {elements_values}")


        final_output = []
        for value in elements_values:
            if isinstance(value, dict):
                final_output.append(value['user_id'])
            elif isinstance(value, list):
                final_output += value
            else:
                pass

        return final_output

    (p | Create(keyed_elements)
       | beam.CombinePerKey(group_users)
       | Map(print)
     )