0
votes

I'm currently working on a larger Apache Beam pipeline with the Python API which reads data from BigQuery and in the end writes it back to another BigQuery task.

One of the transforms needs to use a binary program to transform the data, and for that it needs to load a 23GB file with binary lookup data. So starting and running the program takes a lot of overhead (takes about 2 minutes to load/run each time) and RAM, and it wouldn't make sense to start it up for just a single record. Plus the 23GB file would need to be copied locally from Cloud Storage every time.

The workflow for the binary would be:

  1. Copy 23GB file from cloud storage if it's not there already

  2. Save records to a file

  3. run the binary with call()

  4. read the output of the binary and return it

The amount of records the program can process at a time is basically unlimited, so it would be nice to get a somewhat-distributed Beam Transform, where I could specify a number of records to be processed at once (say 100'000 at a time), but still have it distributed so it can run it for 100'000 records at a time on multiple nodes.

I don't see Beam supporting this behaviour, it might be possible to hack something together as a KeyedCombineFn operation that collects records based on some split criterion/key and then runs the binary in the merge_accumulators step over the accumulated records. But this seems very hackish to me.

Or is it possible to GroupByKey and process groups as batches? Does this guarantee that each group is processed at once, or can groups be split behind the scenes by Beam?

I also saw there's a GroupIntoBatches in the Java API, which sounds like what I'd need, but isn't available in the Python SDK as far as I can tell.

My two question are, what's the best way (performance-wise) to achieve this use-case in Apache Beam, and if there isn't a good solution, is there some other Google Cloud service that might be better suited that could be used like Beam --> Other Service --> Beam ?

1

1 Answers

1
votes

Groups cannot be split behind the scenes, so using a GroupByKey should work. In fact, this is a requirement since each individual element must be processed on a single machine and after a GroupByKey all values with a given key are part of the same element.

You will likely want to assign random keys. Keep in mind that if there are too many values with a given key it may also be difficult to pass all of those values to your program -- so you may also want to limit how many of the values you pass to the program at a time and/or adjust how you assign keys.

One trick for assigning random keys is to generate the random number in start bundle (say 1 to 1000) and then in process element just increment this and wrap 1001 to 1000. This avoids generating a random number for every element, and still ensures a good distribution of keys.

You could create a PTransform for both this logic (divide a PCollection<T> into PCollection<List<T>> chunks for processing), and that would be potentially reusable in similar situations.