I have a dataflow pipeline processing an input of about 1Gb of data with two dicts as side_inputs. The goal is to calculate features from the main dataset with the help of those two side_inputs.
Overall structure of the pipeline is as follows:
# First side input, ends up as a 2GB dict with 3.5 million keys
side_inp1 = ( p |
"read side_input1" >> beam.io.ReadFromAvro("$PATH/*.avro") |
"to list of tuples" >> beam.Map(lambda row: (row["key"], row["value"]))
)
# Second side input, ends up as a 1.6GB dict with 4.5 million keys
side_inp2 = (p |
"read side_input2" >> beam.io.ReadFromAvro("$PATH2/*.avro") |
"to list of tuples" >> beam.Map(lambda row: (row["key"], row["value"]))
)
# The main part of the pipeline, reading an avro dataset of 1 million rows -- 20GB
(p |
"read inputs" >> beam.io.ReadFromAvro("$MainPath/*.avro") |
"main func" >> beam.Map(MyMapper, pvalue.AsDict(side_inp1), pvalue.AsDict(side_inp2))
)
Here's the Dataflow graph:
And the "Featurize" step unwrapped:
So Featurize is a function that looks for ids in the side-inputs, .get
s the vectors and does like 180 different ways of vector dot products to calculate some features. It's a completely CPU bound process and it's expected to take longer than the rest of the pipeline, but stalling is the thing that's strange here.
My problems are two fold:
- The dataflow pipeline seems to slow down drastically as it moves further in the process. I don't know what the reasons are and how can I alleviate this problem. A throughput chart of the
MyMapper
step can be seen below, I'm wondering for the declining throughput (from ~400 rows/sec to nearly ~1 rows/sec in the end).
- Also the behavior of side_inputs is strange to me. I expected the side_inputs to be read only and only once, but when I checkout the Job Metrics / Throughput chart, I observe the following chart. As can be seen, the pipeline is constantly reading in side_inputs, while what I want is only two dicts that are kept in memory.
Other job configurations
- zone: us-central-1a
- machine_type: m1-ultramem-40 (40 CPU cores, 960GB RAM)
- disk_type/size: ssd/50GB
- experiments: shuffle-service enabled.
- max_num_workers: 1 to help ease calculations and metrics, and not have them vary due to auto-scaling.
Extra Observations
I'm constantly seeing log entires like the following in LogViewer:
[INFO] Completed workitem: 4867151000103436312 in 1069.056863785 seconds"
All completed workItems so far have taken about 1000-1100 seconds, this is another source of confusion, why should throughput drop while processing workItems takes the same time as before? Has parallelism dropped for some reason? (maybe some hidden threading threshold that's out of my control, like harness_threads?).In the later parts of the pipelines, looking at the logs, it looks the execution pattern is very sequential (Seems like it's executing 1 workItem, finishes it, goes to the next, which is strange to me, considering there's 1TB of available memory and 40cores)
There are 0 errors or even warnings
--workerCacheMb=<XXX>
doc – Iñigo