I'm trying to set up a dataflow streaming pipeline in python. I have quite some experience with batch pipelines. Our basic architecture looks like this:
The first step is doing some basic processing and takes about 2 seconds per message to get to the windowing. We are using sliding windows of 3 seconds and 3 second interval (might change later so we have overlapping windows). As last step we have the SOG prediction that takes about 15ish seconds to process and which is clearly our bottleneck transform.
So, The issue we seem to face is that the workload is perfectly distributed over our workers before the windowing, but the most important transform is not distributed at all. All the windows are processed one at a time seemingly on 1 worker, while we have 50 available.
The logs show us that the sog prediction step has an output once every 15ish seconds which should not be the case if the windows would be processed over more workers, so this builds up huge latency over time which we don't want. With 1 minute of messages, we have a latency of 5 minutes for the last window. When distribution would work, this should only be around 15sec (the SOG prediction time). So at this point we are clueless..
Does anyone see if there is something wrong with our code or how to prevent/circumvent this? It seems like this is something happening in the internals of google cloud dataflow. Does this also occur in java streaming pipelines?
In batch mode, Everything works fine. There, one could try to do a reshuffle to make sure no fusion etc occurs. But that is not possible after windowing in streaming.
args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
job_name='XX',
num_workers=args.workers,
max_num_workers=MAX_NUM_WORKERS,
disk_size_gb=DISK_SIZE_GB,
local=args.local,
streaming=args.streaming)
pipeline = beam.Pipeline(options=pipeline_options)
# Build pipeline
# pylint: disable=C0330
if args.streaming:
frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
subscription=SUBSCRIPTION_PATH,
with_attributes=True,
timestamp_attribute='timestamp'
))
frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
create_frame_tuples_fn)
crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
pred_bbox_tfserv_fn, SERVER_URL)
sliding_windows = bboxs | 'Window' >> beam.WindowInto(
beam.window.SlidingWindows(
FEATURE_WINDOWS['goal']['window_size'],
FEATURE_WINDOWS['goal']['window_interval']),
trigger=AfterCount(30),
accumulation_mode=AccumulationMode.DISCARDING)
# GROUPBYKEY (per match)
group_per_match = sliding_windows | 'Group' >> beam.GroupByKey()
_ = group_per_match | 'LogPerMatch' >> beam.Map(lambda x: logging.info(
"window per match per timewindow: # %s, %s", str(len(x[1])), x[1][0][
'timestamp']))
sog = sliding_windows | 'Predict SOG' >> beam.Map(predict_sog_fn,
SERVER_URL_INCEPTION,
SERVER_URL_SOG )
pipeline.run().wait_until_finish()