I've built a Beam/Dataflow pipeline to process shapefiles. I have this simple pipeline:
with beam.Pipeline(options=pipeline_options) as p:
feature_collection = (p
| beam.Create([known_args.gcs_url])
| 'LoadShapefile' >> beam.ParDo(LoadShapefile()))
| beam.Map(print)
class LoadShapefile(beam.DoFn):
def process(self, gcs_url):
with beam.io.gcp.gcsio.GcsIO().open(gcs_url, 'rb') as f:
collection = BytesCollection(f.read())
return iter(collection)
This pipeline works great, but I need to capture an additional attribute of collection that isn't available to each element inside it. I need collection.crs to be available as a variable or argument inside a DoFn or beam.Map later in the pipeline in order to correctly process each element.
I'd like to return something like this:
return (collection.crs, iter(collection))
but I cannot figure out how to separate out the collection iterator and the .crs property and have the pipeline work properly. Basically in the non-Beam world, I might consider setting a global variable crs that is available everywhere, but that isn't possible in Beam AFAIK.
What is the correct way to accomplish this in Beam?
EDIT: collection.crs is a small dict that will look something like this: {'init': 'epsg:2284'}. This dict will never contain more than a couple items, but this metadata is vital for correct processing of the elements in collection.