2
votes

I have a dataflow pipeline processing an input of about 1Gb of data with two dicts as side_inputs. The goal is to calculate features from the main dataset with the help of those two side_inputs.

Overall structure of the pipeline is as follows:

    # First side input, ends up as a 2GB dict with 3.5 million keys
    side_inp1 = ( p | 
        "read side_input1" >> beam.io.ReadFromAvro("$PATH/*.avro") | 
        "to list of tuples" >> beam.Map(lambda row: (row["key"], row["value"]))
    )

    # Second side input, ends up as a 1.6GB dict with 4.5 million keys
    side_inp2 = (p | 
        "read side_input2" >> beam.io.ReadFromAvro("$PATH2/*.avro") | 
        "to list of tuples" >> beam.Map(lambda row: (row["key"], row["value"]))
    )
    
    # The main part of the pipeline, reading an avro dataset of 1 million rows -- 20GB
    (p |
     "read inputs" >> beam.io.ReadFromAvro("$MainPath/*.avro") | 
     "main func" >> beam.Map(MyMapper, pvalue.AsDict(side_inp1), pvalue.AsDict(side_inp2))
     )

Here's the Dataflow graph:

Dataflow graph

And the "Featurize" step unwrapped: enter image description here

So Featurize is a function that looks for ids in the side-inputs, .gets the vectors and does like 180 different ways of vector dot products to calculate some features. It's a completely CPU bound process and it's expected to take longer than the rest of the pipeline, but stalling is the thing that's strange here.

My problems are two fold:

  1. The dataflow pipeline seems to slow down drastically as it moves further in the process. I don't know what the reasons are and how can I alleviate this problem. A throughput chart of the MyMapper step can be seen below, I'm wondering for the declining throughput (from ~400 rows/sec to nearly ~1 rows/sec in the end).

A throughput chart of MyMapper state

  1. Also the behavior of side_inputs is strange to me. I expected the side_inputs to be read only and only once, but when I checkout the Job Metrics / Throughput chart, I observe the following chart. As can be seen, the pipeline is constantly reading in side_inputs, while what I want is only two dicts that are kept in memory.

Overal Job Throughput

Other job configurations

  • zone: us-central-1a
  • machine_type: m1-ultramem-40 (40 CPU cores, 960GB RAM)
  • disk_type/size: ssd/50GB
  • experiments: shuffle-service enabled.
  • max_num_workers: 1 to help ease calculations and metrics, and not have them vary due to auto-scaling.

Extra Observations

  • I'm constantly seeing log entires like the following in LogViewer: [INFO] Completed workitem: 4867151000103436312 in 1069.056863785 seconds" All completed workItems so far have taken about 1000-1100 seconds, this is another source of confusion, why should throughput drop while processing workItems takes the same time as before? Has parallelism dropped for some reason? (maybe some hidden threading threshold that's out of my control, like harness_threads?).

  • In the later parts of the pipelines, looking at the logs, it looks the execution pattern is very sequential (Seems like it's executing 1 workItem, finishes it, goes to the next, which is strange to me, considering there's 1TB of available memory and 40cores)

  • There are 0 errors or even warnings

1
That behavior normally means a constrains in disk size/memory. Using Ultramem, I doubt the memory being an issue. My suggestions: try to see which FStep the pipeline is getting stuck, look for OoM and for logs similar to "Memtable full" or "L0 too many files" (similar to that). If you have side inputs of such a big size, maybe you need to specify --workerCacheMb=<XXX> docIñigo
@Iñigo Thanks for pointing out the doc, it's something I'll experiment with and report back here. On the OOM/memory issues side, no failures exist in the 4 hours the job is running.SpiXel
@Iñigo Do you know if that flag exists for the Python SDK? I can't find it in the code.SpiXel
It's true it doesn't look it exists in Python, sorry. Maybe you could try using a bigger disk? 50gb doesnt look much for the size of the worker.Iñigo
I'm not sure why would dataflow need more disk space, other than maybe caching side_inputs, which I have given it so much ram that I don't even expect it to do that. Anyway I also did try with 100GB and no change of behaviorSpiXel

1 Answers

2
votes

The throughput chart in point 1 is a good indicator that the performance in your job decreased somehow.

The side input is intended to be in memory; however, I'm not quite sure that a pipeline with only 1 highmem node is a good approach. By having only one node, the pipeline might have bottlenecks difficult to identify, e.g. Network or OS limitations (like max number of files opened in the OS related to the files loaded into memory). Because of beam's architecture, I think it is not a problem that you can have more nodes even if autoscaling is enabled since we find that autoscaling automatically chooses the appropriate number of worker instances required to run your job. If you are worried about calculations and metrics for other reasons, please share.

Regarding point 2, I think it is expected to find activity on the graph since the side input (in memory) is read by each element being processed. However, if this doesn't make sense for you, you can always add the complete job graph for us to understand any other details of the pipeline steps.

My recommendation is adding more workers to distribute the workaload as a PCollection is a distributed dataset that will be distributed among available nodes. You can try to have similar computational resources with more nodes, for example, 4 instances n2d-highmem-16 (16vCPU 128GB). With this changes it is possible that any bottlenecks dissapear or can be mitigated; in addition, you can monitor the new job in the same way:

  • Remember to check errors in your pipeline, so you can identify any other issues that are happening/causing the performance issue.

  • Check the CPU and Memory usage in Dataflow UI. If memory errors are happening at job level Stackdriver should shows them as memory errors, but also the memory in the host instance should be checked to be sure that it is not reaching the limit in the OS for other reasons.

  • You might want to check this example with side inputs as dictionaries. I'm not expert, but you can follow the best practice in the example.

UPDATE

If machines n2d-highmem-16 have OOM, it seems to me that each harness thread might use a copy of the dicts. Not quite sure if configuring the number of threads can help, but you can try to set number_of_worker_harness_threads in the pipeline options.

On the other hand, can you expand the step Featurize? The wall time is very high in this step (~6 days), let's check the composite transforms that absorbed such latency. For the problematic composite transforms let us know the code snippet. To identify the composite transforms that can have issues please refer to Side Inputs Metrics especially Time spent writing and Time spent reading.