I have a Beam streaming job running on Dataflow runner. It loads requests from PubSub (using Python's apache_beam.io.ReadFromPubSub), then fetches data from BigTable, do a heavy computation on the data and writes to PubSub again.
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| "Receive" >> beam.io.ReadFromPubSub(topic=TOPIC_READ)
| "Parse" >> beam.ParDo(Parse())
| "Fetch" >> beam.ParDo(FetchFromBigtable(project, args.bt_instance, args.bt_par, args.bt_batch))
| "Process" >> beam.ParDo(Process())
| "Publish" >> beam.io.WriteToPubSub(topic=TOPIC_WRITE)
)
Basically I don't need any windowing, I would like just limit the number of elements processed in parallel on 1 machine (i.e. control the parallelism by number of workers). Otherwise it causes out of memory during the heavy computation and I also need to limit the rate of BigTable requests.
I'm using standard 2 CPU machine so I would expect that it would process 2 elemets in parallel - I also set --number_of_worker_harness_threads=2 and --sdk_worker_parallelism=1. For some reason though I'm seeing many elements processed in parallel by multiple threads which cause the memory and rate limit problems. I guess those are bundles processed in parallel based on the logs (e.g. work: "process_bundle-105").
I tried to hack it by using a semaphore inside processElement (to just process one element per DoFN instance) and it works, but the autoscaling does not kick off and it looks like a pure hack which may have other consequences.
What would you recommend? How can I limit the number of parallel bundles to be processed? Ideally just one bundle per worker harness thread? Is beam/dataflow suitable for such use case or is it better to achieve it with plain kubernetes with autoscaling?
EDIT:
Running on Beam SDK 2.28.0
I'd like to limit the parallelism, but I have not described well symptoms that leaded me to that conclusion.
- Sometimes I got timeouts in the
Fetchstage
Deadline of 60.0s exceeded while calling functools.partial(<bound method PartialRowsData._read_next of <google.cloud.bigtable.row_data.PartialRowsData object at 0x7f16b405ba50>>)
- Processing of one element in
Processstage slows down significantly (into minutes instead of seconds) and sometimes it even gets stuck (probably because of memory problems).
Below are logs from one worker logged before and after processing of 1 element in Process stage (single-threaded) filtered by jsonPayload.worker and jsonPayload.portability_worker_id (i.e. I hope those should be logs from one container). I can see much more than 12 elements being processed in a single moment.

