0
votes

I was looking at the job graph of my recently deployed pipeline, and I noticed that a seemingly very simple DoFn appears to be taking a huge amount of processing time relative to the other tasks.

The pipeline takes streaming session data messages, which have both project_id and a user_id identifiers, and generates hourly rollups that are aggregated at the project level and saves them to BigQuery. The pipeline processes ~1k JSON messages per second, and is running on 10 n2-standard machines. Brief summary of the steps:

  1. Read from PubSub
  2. Window - 5 day fixed windows, with a repeating 3 hour trigger and AccumulationMode.ACCUMULATING.
  3. Aggregate by user + a bunch of other fields (even though we ultimately do project level rollups, this step must happen first to avoid overcounting certain fields)
  4. Simple DoFn that maps from old (keys), value to new (keys), value (removes user_id from the list of keys so that we are now aggregating at the project level)
class CollapseKeys(beam.DoFn):
    """
    Re-map data to the hour/org/project/release/env level (remove distinct_id key)
    """
    def process(self, elem, window=beam.DoFn.WindowParam):
        (timestamp, org_id, project_id, release, env, sdk, distinct_id), vals = elem
        yield (timestamp, org_id, project_id, release, env, sdk), vals
  1. Calculate hourly, project level rollups
  2. Save to BQ

As you can see from the picture, the Wall time (time spent on step) seems to be extremely high for the DoFn that is just mapping from one set of keys to another. Am I committing some vile Beam/Dataflow sin here? Is this just an artifact of the Dataflow UI? I am new to both Beam and Dataflow and would appreciate any advice.

Job Graph (Simple DoFn is the collapse to project step

1
Hi,I think this can be normal. Probably there is a "group by" operation in the "Collapse to project" operation which triggers a shuffle between the machines. This means there is network traffic which is costly in time, and this can increase the wall time of the step.jszule
hi @jszule thanks for the response! I should have been more clear in the question, but there's no "group by" operation happening in the collapse to project operation. "Collapse to project" is just running the DoFn I outlined in my question: | "Collapse to project" >> beam.ParDo(CollapseKeys())user3637516
@user3637516 I see that you have set the amount of workers to 10. However, it seems that your process CollapseKeys() may need more than 10 workers. Have you considered in using autoscalling with setting the maximum number of workers instead of using 10 workers throughout the whole process?Alexandre Moraes
If that is literally your DoFn, this is surprising. (One place this can crop up is if your DoFn is iterating over the (possibly large) set of values after a GroupByKey, which induces the actual GBK reads, but you don't even seem to be doing that here...)robertwb

1 Answers

0
votes

You can use the options at https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L890 to grab a profile and figure out where your time is being spent in specific DoFns.