0
votes

I'm building a pipeline using Apache Beam in Python and I'm needing to avoid a race condition when writing to a file in Google Cloud Storage.

The following link describes how to use concurrency control in Google Cloud Storage using gsutil.

https://cloud.google.com/storage/docs/gsutil/addlhelp/ObjectVersioningandConcurrencyControl#concurrency-control

Does anyone know if there is a way to accomplish the same thing using Python or the Apache Beam Python SDK?

1

1 Answers

1
votes

If you need to perform certain operations sequentially, your best bet is to do a group by key that gets them together.

For example, if you have two different elements writing to the same GCS file, you would want to do something like:

(my_collection | beam.Map(lambda x: (x['filename'], x))
               | beam.GroupByKey()
               | beam.Map(write_each_value))

By performing a GroupByKey, you ensure that elements with equal filenames go into the same worker, and are operated sequentially.