I have Apache Beam pipeline that reads data from Google Cloud Datastore. Pipeline is ran in Google Cloud Dataflow in batch mode and it is written in Python.
Problem is with templated argument which I'm trying to use to create Datastore query with dynamic timestamp filter.
Pipeline is defined as follows:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1new.types import Query
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--filter', type=int)
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
user_options = pipeline_options.view_as(UserOptions)
data = (p
| 'Read' >> ReadFromDatastore(build_query(user_options.filter.get()))
| ...
And build_query
function as follows:
def build_query(filter):
return Query(
kind='Kind',
filters=[('timestamp', '>', filter)],
project='Project'
)
Running this leads to error RuntimeValueProvider(...).get() not called from a runtime context
.
I have also tried ReadFromDatastore(build_query(user_options.filter))
But then error is ValueError: (u"Unknown protobuf attr type [while running 'Read/Read']", <class 'apache_beam.options.value_provider.RuntimeValueProvider'>)
.
Everything works just fine if templated argument is removed from equation eg. like this: ReadFromDatastore(build_query(1563276063))
. So the problem is with using templated argument while building Datastore query.
My guess is that build_query
should be defined some other way but after spending some time with documentation and googling I still have no idea how.
Any suggestions how I could solve this are highly appreciated!
EDIT 1
Actually, in this case filter is always relative to current timestamp so passing it as an argument is probably not even necessary if there is some other way to use dynamic values. Tried with ReadFromDatastore(build_query(int(time())-90000))
but two consecutive runs contained exactly same filter.