purpose:
I want to load stream data, then add a key and then count them by key.
problem:
Apache Beam Dataflow pipline gets a memory error when i try to load and group-by-key a big-size data using streaming approach (unbounded data) . Because it seems that data is accumulated in group-by and it does not fire data earlier with triggering of each window.
If I decrease the elements size (elements count will not change) it works! because actually group-by step waits for all the data to be grouped and then fire all the new windowed data.
I tested with both:
beam version 2.11.0 and scio version 0.7.4
beam version 2.6.0 and scio version 0.6.1
The way to regenerate the error:
- Read a Pubsub message that contains file name
- Read and load the related file from GCS as a row by row iterator
- Flatten row by row (so it generates around 10,000) elements
- Add timestamps (current instant time) to elements
- Create a key-value of my data (with some random integer keys from 1 to 10)
- Apply window with triggering (it will trigger around 50 times in the case when rows are small and no memory problem)
- Count per key ( group by key then combine them )
- Finally we supposed to have around 50 * 10 elements that represent counts by window and key (tested successfully when rows size are small enough)
Visualization of the pipeline ( steps 4 to 7 ):
Summary for group-by-key step :
As you can see the data is accumulated in group-by step and does not get emitted.
Windowing code is here :
val windowedData = data.applyKvTransform(
Window.into[myt](
Sessions.withGapDuration(Duration.millis(1)))
.triggering(
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))
).orFinally(AfterWatermark.pastEndOfWindow())
).withAllowedLateness(Duration.standardSeconds(100))
.discardingFiredPanes()
)
The error:
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Is there any solution to solve the memory problem maybe by forcing group-by to emit early results of each window.