I'm building a beam pipeline to process/transform csv files into xml files, following some rules. My approach so far have been to split the input csv file by rows at the beginning of the pipeline and feed every row into the pipeline. In the pipeline a transform every row into a xml tag, and at the end of the pipeline I combineGlobally everything into the final xml file. The problem now is that I need some extra information stored in Google Datastore to build the xml tag for every row in the csv file, and I don't know how to do this, because the query to retrieve data from datastore is a runtime parameter (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore) and the query depends on the PCollection. I need to build a query like this:
Select from xxx where id in PCollection
Is there a way to do this?, something like a combineGlobally to build the query, and then pass the query somehow to the ReadFromDatastore function? Or is there any way to do what I need?
I have something like this right now:
with beam.Pipeline(options=pipeline_options) as p:
items = (
p |
'ReadCsvFile' >> beam.io.Read(CsvFileSource(input_name))
'PrepareToJoin' >> beam.ParDo(PrepareToJoin())
)
datastore_items = (p |
'DatastoreDataP' >> ReadFromDatastore(project_id, query)
)
new_items = (
{'data': items, 'datastore': datastore_items} |
'JoinWithDatastore' >> beam.CoGroupByKey() |
'PostJoinProcess' >> beam.ParDo(PostJoinProcess())
)
xml_file = (new_items |
'ItemToXmlTag' >> beam.ParDo(ItemToXmlTag()) |
'MakeXMLFile' >> beam.CombineGlobally(XMLCombineFn()) |
WriteToText(output_name)
)
As you can see, query is a parameter in the pipeline.
Thanks in advance for your help!