1
votes

I've been trying to debug a pipeline which takes a input parameter that drives subsequent ParDo operations. For reasons that I cannot fathom, the pipeline will not scale beyond a single worker even though I've disabled autoscaling and set the number of workers. Sadly the abysmal dataflow interface on GCP does little to illuminate the inability to scale. Can anyone advise as to what might be the issue or how to debug effectively?

with beam.Pipeline(options=opts) as p:
  result = (
      p | "Initialize Pipeline" >> beam.Create(
          [(f'gs://data/']) |
      "Scan for extraction tasks" >> beam.ParDo(scanner.ScanForTasks()) |
      "Extract data" >> beam.ParDo(worker.TaskToData()))
1

1 Answers

2
votes

The problem turned out to be related to an optimization in dataflow called 'fusion' in which adjacent operations are fused together, presumably so that they can run seamlessly on the same worker. The problem is that if a pipeline is seeded by a single item which generates a large number of downstream tasks, all these tasks will then be processed on the same worker that handled the initial seeding task.

The solution is to seed the pipeline directly with the tasks to prevent this 'optimization' from killing performance

def scan_for_tasks():
  tasks = []
  # Build your task list here
  return tasks

with beam.Pipeline(options=opts) as p:
  result = (
    p | "Initialize Pipeline" >> beam.Create(scan_for_tasks()) |
    "Extract data" >> beam.ParDo(worker.TaskToData()))