I was looking at the job graph of my recently deployed pipeline, and I noticed that a seemingly very simple DoFn appears to be taking a huge amount of processing time relative to the other tasks.
The pipeline takes streaming session data messages, which have both project_id
and a user_id
identifiers, and generates hourly rollups that are aggregated at the project level and saves them to BigQuery. The pipeline processes ~1k JSON messages per second, and is running on 10 n2-standard
machines. Brief summary of the steps:
- Read from PubSub
- Window - 5 day fixed windows, with a repeating 3 hour trigger and
AccumulationMode.ACCUMULATING
. - Aggregate by user + a bunch of other fields (even though we ultimately do project level rollups, this step must happen first to avoid overcounting certain fields)
- Simple DoFn that maps from old
(keys), value
to new(keys), value
(removes user_id from the list of keys so that we are now aggregating at the project level)
class CollapseKeys(beam.DoFn):
"""
Re-map data to the hour/org/project/release/env level (remove distinct_id key)
"""
def process(self, elem, window=beam.DoFn.WindowParam):
(timestamp, org_id, project_id, release, env, sdk, distinct_id), vals = elem
yield (timestamp, org_id, project_id, release, env, sdk), vals
- Calculate hourly, project level rollups
- Save to BQ
As you can see from the picture, the Wall time (time spent on step) seems to be extremely high for the DoFn that is just mapping from one set of keys to another. Am I committing some vile Beam/Dataflow sin here? Is this just an artifact of the Dataflow UI? I am new to both Beam and Dataflow and would appreciate any advice.
| "Collapse to project" >> beam.ParDo(CollapseKeys())
– user3637516