1
votes

I have Apache Beam pipeline that reads data from Google Cloud Datastore. Pipeline is ran in Google Cloud Dataflow in batch mode and it is written in Python.

Problem is with templated argument which I'm trying to use to create Datastore query with dynamic timestamp filter.

Pipeline is defined as follows:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1new.types import Query

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--filter', type=int)

pipeline_options = PipelineOptions()

with beam.Pipeline(options=pipeline_options) as p:

    user_options = pipeline_options.view_as(UserOptions)

    data = (p
        | 'Read' >> ReadFromDatastore(build_query(user_options.filter.get()))
        | ...

And build_query function as follows:

def build_query(filter):
    return Query(
        kind='Kind',
        filters=[('timestamp', '>', filter)],
        project='Project'
    )

Running this leads to error RuntimeValueProvider(...).get() not called from a runtime context.

I have also tried ReadFromDatastore(build_query(user_options.filter)) But then error is ValueError: (u"Unknown protobuf attr type [while running 'Read/Read']", <class 'apache_beam.options.value_provider.RuntimeValueProvider'>).

Everything works just fine if templated argument is removed from equation eg. like this: ReadFromDatastore(build_query(1563276063)). So the problem is with using templated argument while building Datastore query.

My guess is that build_query should be defined some other way but after spending some time with documentation and googling I still have no idea how.

Any suggestions how I could solve this are highly appreciated!

EDIT 1

Actually, in this case filter is always relative to current timestamp so passing it as an argument is probably not even necessary if there is some other way to use dynamic values. Tried with ReadFromDatastore(build_query(int(time())-90000)) but two consecutive runs contained exactly same filter.

1

1 Answers

2
votes

Value providers need to be supported by the source you're using. Only there can it be unpacked at the right moment.

When creating your own source you have full control over this obviously. When using a pre-existing source I only see two options:

  1. Provide the value at template creation, meaning don't use a template argument for it
  2. Create a PR for the pre-existing source to support template arguments