3
votes

purpose:

I want to load stream data, then add a key and then count them by key.

problem:

Apache Beam Dataflow pipline gets a memory error when i try to load and group-by-key a big-size data using streaming approach (unbounded data) . Because it seems that data is accumulated in group-by and it does not fire data earlier with triggering of each window.

If I decrease the elements size (elements count will not change) it works! because actually group-by step waits for all the data to be grouped and then fire all the new windowed data.

I tested with both:

beam version 2.11.0 and scio version 0.7.4

beam version 2.6.0 and scio version 0.6.1

The way to regenerate the error:

  1. Read a Pubsub message that contains file name
  2. Read and load the related file from GCS as a row by row iterator
  3. Flatten row by row (so it generates around 10,000) elements
  4. Add timestamps (current instant time) to elements
  5. Create a key-value of my data (with some random integer keys from 1 to 10)
  6. Apply window with triggering (it will trigger around 50 times in the case when rows are small and no memory problem)
  7. Count per key ( group by key then combine them )
  8. Finally we supposed to have around 50 * 10 elements that represent counts by window and key (tested successfully when rows size are small enough)

Visualization of the pipeline ( steps 4 to 7 ):

enter image description here

Summary for group-by-key step :

enter image description here

As you can see the data is accumulated in group-by step and does not get emitted.

Windowing code is here :

val windowedData = data.applyKvTransform(
  Window.into[myt](
    Sessions.withGapDuration(Duration.millis(1)))
    .triggering(
      Repeatedly.forever(AfterFirst.of(
        AfterPane.elementCountAtLeast(10),
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))

      ).orFinally(AfterWatermark.pastEndOfWindow())

    ).withAllowedLateness(Duration.standardSeconds(100))
    .discardingFiredPanes()

)

The error:

org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    java.lang.Thread.run(Thread.java:745)

Is there any solution to solve the memory problem maybe by forcing group-by to emit early results of each window.

1
It would be helpful if you explained what you wanted to happen with your data and then we could discuss why the trigger is not doing what you expect (it is hard to reason about what you want to happen from only the trigger definition). Also, how is the watermark being updated upstream of AddTimestamps (what source are you using, do you see the watermark advancing)?Lukasz Cwik
@LukaszCwik thanks, I explained my purpose in the post. I found out that data watermark stoped advancing when it arrived to group-by-key step. if i try the same pipeline with small data-size the behavior of group-by step is the same: Watermark stops in group-by for around 2 minutes then advance(Because data size is ok and no error) after all data are present in group-by step (it seems data is accumulated)Saeed Mohtasham
Based upon your current trigger definition, it seems like your trying to output data every time you have at least 10 elements OR once you have seen at least 1 element and have waited for at least 1 millisecond and you want to stop producing output once the window is past the end of the watermark. Does this sound like what you want? Also, since your watermark isn't advancing, your FINALLY condition will never be met, what source are you using (Pubsub/Kafka/...)?Lukasz Cwik
@LukaszCwik yes exactly! The triggering happens but only when all the data grouped into their own window and key. group-by never output partial results of already computed panes produced by trigger. my input is PubSub messages containing CSV filenames and then I read and flatten the file to get Collection of rowsSaeed Mohtasham
Does your pubsub message have a timestamp attribute that you can use for watermark purposes or do you set the timestamp of the records only based upon data you have read from the CSV file?Lukasz Cwik

1 Answers

5
votes

The KeyCommitTooLargeException is not a memory problem but a protobuf serialization problem. Protobuf has a limit of 2GB for an object (google protobuf maximum size). Dataflow found that the value of a single key in the pipeline was larger than 2GB therefore it couldn't shuffle the data. The error message indicates that "This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element." Based on your pipeline setup (i.e., assigned random keys), it is more likely the latter.

The pipeline may have read a large file (>2GB) from GCS and assigned it to a random key. GroupByKey requires a key shuffle operation and Dataflow failed to do due to the protobuf limitation therefore stuck on that key and hold the watermark.

If a single key has large value, you may want to reduce the value size, for example, compress the string, or split the string to multiple keys, or generate smaller GCS file in the first place.

If the large value is from grouping of multiple keys, you may want to increase the key space so every group by key operations end up group fewer keys together.