0
votes

I have a Beam streaming job running on Dataflow runner. It loads requests from PubSub (using Python's apache_beam.io.ReadFromPubSub), then fetches data from BigTable, do a heavy computation on the data and writes to PubSub again.

with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | "Receive" >> beam.io.ReadFromPubSub(topic=TOPIC_READ)
            | "Parse" >> beam.ParDo(Parse())
            | "Fetch" >> beam.ParDo(FetchFromBigtable(project, args.bt_instance, args.bt_par, args.bt_batch))
            | "Process" >> beam.ParDo(Process())
            | "Publish" >> beam.io.WriteToPubSub(topic=TOPIC_WRITE)
        )

Basically I don't need any windowing, I would like just limit the number of elements processed in parallel on 1 machine (i.e. control the parallelism by number of workers). Otherwise it causes out of memory during the heavy computation and I also need to limit the rate of BigTable requests.

I'm using standard 2 CPU machine so I would expect that it would process 2 elemets in parallel - I also set --number_of_worker_harness_threads=2 and --sdk_worker_parallelism=1. For some reason though I'm seeing many elements processed in parallel by multiple threads which cause the memory and rate limit problems. I guess those are bundles processed in parallel based on the logs (e.g. work: "process_bundle-105").

enter image description here

I tried to hack it by using a semaphore inside processElement (to just process one element per DoFN instance) and it works, but the autoscaling does not kick off and it looks like a pure hack which may have other consequences.

What would you recommend? How can I limit the number of parallel bundles to be processed? Ideally just one bundle per worker harness thread? Is beam/dataflow suitable for such use case or is it better to achieve it with plain kubernetes with autoscaling?

EDIT:

Running on Beam SDK 2.28.0

I'd like to limit the parallelism, but I have not described well symptoms that leaded me to that conclusion.

  1. Sometimes I got timeouts in the Fetch stage
Deadline of 60.0s exceeded while calling functools.partial(<bound method PartialRowsData._read_next of <google.cloud.bigtable.row_data.PartialRowsData object at 0x7f16b405ba50>>)
  1. Processing of one element in Process stage slows down significantly (into minutes instead of seconds) and sometimes it even gets stuck (probably because of memory problems).

Below are logs from one worker logged before and after processing of 1 element in Process stage (single-threaded) filtered by jsonPayload.worker and jsonPayload.portability_worker_id (i.e. I hope those should be logs from one container). I can see much more than 12 elements being processed in a single moment.

Logs of process stage

2

2 Answers

1
votes

I have had success solving this same kind of problem for Dataflow and Elasticsearch by making use of Stateful Processing. You could use GroupIntoBatches to reduce the parallelism if your sink cannot keep up with the pace of the rest of the pipeline.

As far as I understand, states are maintained by the runner on a per-key-per-window basis. To use stateful processing, your data will need to have keys. Those keys can be arbitrary and ignored by the DoFn you use to consume the elements.

You mentioned that you don't need windowing, and if you're not using any windowing currently that would imply that you're using the default singular Global Window. In this case, whatever number of distinct keys you arbitrarily assign to your data will be the maximum number of parallelized states maintained. Just be aware that this solution won't be portable to all runners as Stateful processing is not globally supported by all runners.

1
votes

Dataflow launches one SDK worker container per core so in your case there will be 2 worker containers(processes) per machine. Each worker process has an unbounded thread pool for processing the bundles but I think only one bundle gets processed with one thread at a time due to python GIL.

You can --experiments no_use_multiple_sdk_containers to limit the sdk container number to one (since it seems that your use case does not care about throughput that much).