14
votes

I'm trying to set up a dataflow streaming pipeline in python. I have quite some experience with batch pipelines. Our basic architecture looks like this: enter image description here

The first step is doing some basic processing and takes about 2 seconds per message to get to the windowing. We are using sliding windows of 3 seconds and 3 second interval (might change later so we have overlapping windows). As last step we have the SOG prediction that takes about 15ish seconds to process and which is clearly our bottleneck transform.

So, The issue we seem to face is that the workload is perfectly distributed over our workers before the windowing, but the most important transform is not distributed at all. All the windows are processed one at a time seemingly on 1 worker, while we have 50 available.

The logs show us that the sog prediction step has an output once every 15ish seconds which should not be the case if the windows would be processed over more workers, so this builds up huge latency over time which we don't want. With 1 minute of messages, we have a latency of 5 minutes for the last window. When distribution would work, this should only be around 15sec (the SOG prediction time). So at this point we are clueless..

enter image description here

Does anyone see if there is something wrong with our code or how to prevent/circumvent this? It seems like this is something happening in the internals of google cloud dataflow. Does this also occur in java streaming pipelines?

In batch mode, Everything works fine. There, one could try to do a reshuffle to make sure no fusion etc occurs. But that is not possible after windowing in streaming.

args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
                                        job_name='XX',
                                        num_workers=args.workers,
                                        max_num_workers=MAX_NUM_WORKERS,
                                        disk_size_gb=DISK_SIZE_GB,
                                        local=args.local,
                                        streaming=args.streaming)

pipeline = beam.Pipeline(options=pipeline_options)

# Build pipeline
# pylint: disable=C0330
if args.streaming:
    frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
        subscription=SUBSCRIPTION_PATH,
        with_attributes=True,
        timestamp_attribute='timestamp'
    ))

    frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
        create_frame_tuples_fn)

crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
    pred_bbox_tfserv_fn, SERVER_URL)

sliding_windows = bboxs | 'Window' >> beam.WindowInto(
                  beam.window.SlidingWindows(
                        FEATURE_WINDOWS['goal']['window_size'],
                        FEATURE_WINDOWS['goal']['window_interval']),
                  trigger=AfterCount(30),
                  accumulation_mode=AccumulationMode.DISCARDING)

# GROUPBYKEY (per match)
group_per_match = sliding_windows | 'Group' >> beam.GroupByKey()
_ = group_per_match | 'LogPerMatch' >> beam.Map(lambda x: logging.info(
    "window per match per timewindow: # %s, %s", str(len(x[1])), x[1][0][
        'timestamp']))

sog = sliding_windows | 'Predict SOG' >> beam.Map(predict_sog_fn,
                                                SERVER_URL_INCEPTION,
                                                SERVER_URL_SOG )

pipeline.run().wait_until_finish()
2

2 Answers

6
votes

In beam the unit of parallelism is the key--all the windows for a given key will be produced on the same machine. However, if you have 50+ keys they should get distributed among all workers.

You mentioned that you were unable to add a Reshuffle in streaming. This should be possible; if you're getting errors please file a bug at https://issues.apache.org/jira/projects/BEAM/issues . Does re-windowing into GlobalWindows make the issue with reshuffling go away?

2
votes

It looks like you do not necessarily need GroupByKey because you are always grouping on the same key. Instead you could maybe use CombineGlobally to append all the elements inside the window in stead of the GroupByKey (with always the same key).

combined = values | beam.CombineGlobally(append_fn).without_defaults()
combined | beam.ParDo(PostProcessFn())

I am not sure how the load distribution works when using CombineGlobally but since it does not process key,value pairs I would expect another mechanism to do the load distribution.