0
votes

I am trying to implement a stateful process with Apache Beam. I've gone through both Kenneth Knowles articles (Stateful processing with Apache Beam and Timely (and Stateful) Processing with Apache Beam), but I didn't find a solution to my issue. I am using the Python SDK.

In particular, I am trying to have a stateful DoFn that contains key-value objects and I need to add new elements and sometimes remove some.

I saw a solution may be to use a SetStateSpec with Tuple coder inside my DoFn class. The problem is that the SetSpaceSpec has no option for a 'pop'-like function. It seems to me that the only way to delete elements is to delete them all with .clear(). It looks like you can't specify just an element to erase with this function.

A chance to overcome this problem may be to clear and rewrite state any time I need to delete an element in the state, but this looks inefficient to me.

Do you have any idea on how to do it efficiently?

Python version 3.8.7
apache-beam==2.29.0

1
How about using ReadModifyWriteStateSpec and have a custom implementation of a coder for a dict for example? Have a look into stackoverflow.com/a/45911664/8330018, it's using a heap, but you can use some ideas from there to have your implementation of .pop for exampleTudor Plugaru
Hi @TudorPlugaru, thanks a lot for your answer. Do you have any idea on how to implement a coder using the Python SDK classes? The example you posted uses the Java one and I am not confident with itHulk'96
The apache beam repo is a nice resource if you want code examples. Regarding an example of a custom coder, you can find it here github.com/apache/beam/blob/master/sdks/python/apache_beam/…Tudor Plugaru

1 Answers

0
votes

I followed @TudorPlugaru's suggestion and I come out with this. Hope it will be useful for someone else.

import json
from apache_beam.coders import Coder

class MyDictCoder(Coder):
    """ My custom dictionary coders """
    def encode(self, o):
        return json.dumps(o).encode()

    def decode(self, o):
        return json.loads(o.decode())

    def is_deterministic(self) -> bool:
        return True

In the DoFn declaration

from apache_beam.transforms.userstate import ReadModifyWriteStateSpec

class MyDoFn(beam.DoFn):
    DICTSTATE= ReadModifyWriteStateSpec(name='dictstate', coder=MyDictCoder())
    
    def process(self, element, DictState=beam.DoFn.StateParam(DICTSTATE)):
        # Do something
        yield DictState

And adding this line inside the pipeline (as done in the Beam example)

beam.coders.registry.register_coder(typing.Dict, MyDictCoder)