We are currently working on a streaming pipeline on Apache Beam with DataflowRunner. We are reading messages from Pub/Sub and do some processing on them and afterwards we window them in slidings windows (currently the window size is 3 seconds and the interval is 3 seconds as well). Once the window is fired we do some post-processing on the elements inside the window. This post-processing step is significantly larger than the window size, it takes about 15 seconds.
The apache beam code of the pipeline:
input = ( pipeline | beam.io.ReadFromPubSub(subscription=<subscription_path>)
| beam.Map(process_fn))
windows = input | beam.WindowInto(beam.window.SlidingWindows(3, 3),
trigger=AfterCount(30),
accumulation_mode = AccumulationModel.DISCARDING)
group = windows | beam.GroupByKey()
group | beam.Map(post_processing_fn)
As you know, Dataflow tries to perform some optimizations on your pipeline steps. In our case it fusions everything together from the windowing onwards (clustered operations: 1/ processing 2/ windowing + post-processing) which is causing a slow sequential post-processing of all the windows by just 1 worker. We see logs every 15 seconds that the pipeline is processing the next window. However, we would like to have multiple workers picking up separate windows instead of the workload going to a single worker.
Therefore we were looking for ways to prevent this fusion from happening so Dataflow separates the window from post-processing of the windows. In that way we would expect Dataflow to be able to assign multiple workers again to the post-processing of fired windows.
What we have tried so far:
- Increase the number of workers to 20, 30 or even 40 but without effect. Only the steps before the windowing gets assigned to multiple workers
- Running the pipeline for 5 or 10 minutes but we noticed no worker re-allocation to help on this larger post-processing step after the windowing
- After the windowing, put them back into a global window
- Simulate another GroupByKey with a dummy key (as mentioned in https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion) but without any success.
The last two actions indeed created a third clustered operation (1/ processing 2/ windowing 3/ post-processing ) but we noticed that still the same worker is executing everything after the windowing.
Is there any solution that can resolve this problem statement?
The current workaround we are now considering is to build another streaming pipeline which receives the windows so these worker can process the windows in parallel but it is cumbersome..