0
votes

How does one use type hints for ValueProvider value types passed to PTransform and DoFn classes?

class MyPTransform(beam.PTransform):
  def __init__(self, my_value_provider: ValueProvider):
     # How do I enforce my_value_provider has value_type of str
     self.my_value_provider = my_value_provider

I can make this a RuntimeValueProvider or StaticValueProvider and test this explicitly:

 type(my_value_provider.type) == str

How do others do this? I didn't see anything here: https://beam.apache.org/documentation/sdks/python-type-safety

2

2 Answers

1
votes

I don't think there is a way to enforce this with python's type checking, although you could always add your own runtime type check to potentially improve error messages.

Alternatively, you can avoid having to use ValueProvider at all by using Flex Templates instead.

0
votes

I ended up creating a wrapper value provider like so, opinions/comments welcome.

from typing import Generic, TypeVar

from apache_beam.options.value_provider import ValueProvider, StaticValueProvider, RuntimeValueProvider

T = TypeVar('T')


class TypedValueProvider(ValueProvider, Generic[T]):
    """
    Providers a wrapper around beam's ValueProvider to allow specifying value type hints
    """

    def __init__(self, value_provider: ValueProvider, value_type: Type[T]):
        self.value_provider = value_provider
        self.value_type = value_type
        assert value_provider.value_type == value_type

    def is_accessible(self):
        return self.value_provider.is_accessible()

    def get(self):
        return self.value_provider.get()


if __name__ == '__main__':
    svp = StaticValueProvider(str, 'blah')
    dvp: TypedValueProvider[str] = TypedValueProvider(svp, str)
    print(isinstance(dvp, ValueProvider))
    assert 'blah' == dvp.get()

    rvp = RuntimeValueProvider('key', str, 'default')
    RuntimeValueProvider.set_runtime_options({'key': 'value'})

    dvp: TypedValueProvider[str] = TypedValueProvider(rvp, str)
    print(isinstance(dvp, ValueProvider))
    assert 'value' == dvp.get()