1
votes

I've built a Beam/Dataflow pipeline to process shapefiles. I have this simple pipeline:

    with beam.Pipeline(options=pipeline_options) as p:
        feature_collection = (p
         | beam.Create([known_args.gcs_url])
         | 'LoadShapefile' >> beam.ParDo(LoadShapefile()))
         | beam.Map(print)
class LoadShapefile(beam.DoFn):
    def process(self, gcs_url):
        with beam.io.gcp.gcsio.GcsIO().open(gcs_url, 'rb') as f:
            collection = BytesCollection(f.read())
            return iter(collection)

This pipeline works great, but I need to capture an additional attribute of collection that isn't available to each element inside it. I need collection.crs to be available as a variable or argument inside a DoFn or beam.Map later in the pipeline in order to correctly process each element.

I'd like to return something like this:

return (collection.crs, iter(collection))

but I cannot figure out how to separate out the collection iterator and the .crs property and have the pipeline work properly. Basically in the non-Beam world, I might consider setting a global variable crs that is available everywhere, but that isn't possible in Beam AFAIK.

What is the correct way to accomplish this in Beam?

EDIT: collection.crs is a small dict that will look something like this: {'init': 'epsg:2284'}. This dict will never contain more than a couple items, but this metadata is vital for correct processing of the elements in collection.

2

2 Answers

0
votes

You could use a tagged output for you little dictionary and then use that as a side input for your next step, but you would have to implement a branching logic.

Can you not use the information right then and there to refine the data before you pass it along the pipeline?

0
votes

You could output an iterable of tuples with the collection's dictionary, like so:

class LoadShapefile(beam.DoFn):
    def process(self, gcs_url):
        with beam.io.gcp.gcsio.GcsIO().open(gcs_url, 'rb') as f:
            collection = BytesCollection(f.read())
            return [(elm, collection.crs) for elm in collection]

You could also set is as a side input:

class LoadShapefile(beam.DoFn):
    def process(self, gcs_url):
        with beam.io.gcp.gcsio.GcsIO().open(gcs_url, 'rb') as f:
            collection = BytesCollection(f.read())
            for elm in collection:
                yield elm
            yield TaggedOutput('crsdata', collection.crs)

And then you would do something like this:

    with beam.Pipeline(options=pipeline_options) as p:
        feature_collections = (p
         | beam.Create([known_args.gcs_url])
         | 'LoadShapefile' >> beam.ParDo(LoadShapefile()))
        collection_crs = beam.pvalue.AsSingleton(feature_collections['crsdata'])
        feature_collection = feature_collections['main']
        # Use these PCollections as you see fit.

Note that this only works for a single gcs_url input. If you have more, then your side input should be a Map or a List instead of a Singleton.