0
votes

We need to group PubSub messages by one of the fields from messages. We used fixed window of 15mins to group these messages.

When run on data flow, the GroupByKey used for messages grouping is introducing too many duplicate elements, another GroupByKey at far end of pipeline is failing with 'KeyCommitTooLargeException: Commit request for stage P27 and key abc#123 has size 225337153 which is more than the limit of..'

I have gone through the below link and found the suggestion was to use Reshuffle but Reshuffle has GroupByKey internally. Why is GroupByKey in beam pipeline duplicating elements (when run on Google Dataflow)?

My pipeline code:

PCollection<String> messages = getReadPubSubSubscription(options, pipeline);

PCollection<String> windowedMessages = messages
    .apply(
        Window
            .<String>into(
                FixedWindows.of(Duration.standardMinutes(15)))
            .discardingFiredPanes());
            
PCollectionTuple objectsTuple = windowedMessages
    .apply(
        "UnmarshalStrings",
        ParDo
            .of(new StringUnmarshallFn())
            .withOutputTags(
                StringUnmarshallFn.mainOutputTag,
                TupleTagList.of(StringUnmarshallFn.deadLetterTag)));

PCollection<KV<String, Iterable<ABCObject>>> groupedObjects =
    objectsTuple.get(StringUnmarshallFn.mainOutputTag)
        .apply(
            "GroupByObjects",
            GroupByKey.<String, ABCObject>create());

PCollection results = groupedObjects
    .apply(
        "FetchForEachKey",
        ParDo.of(SomeFn())).get(SomeFn.tag)
    .apply(
        "Reshuffle",
        Reshuffle.viaRandomKey());
                
results.apply(...)

...

PubSub is not duplicating messages for sure and there are no additional failures, GroupByKey is creating these duplicates, is something wrong with the Windowing I am using?

One observation is GroupBy is producing same no of elements as the next step produce. I am attaching two screenshots one for GroupByKey and Other For Fetch Function.

GroupByKey step GroupByKey Fetch step Fetch

UPDATE After additional analysis

Stage P27 is actually the first GroupByKey which is outputting many elements than expected. I can't see them as duplicates of actual output element as all these million elements are not processed by next Fetch step. I am not sure if these are some dummy elements introduced by dataflow or wrong metric from dataflow.

I am still analyzing further on why this KeyCommitTooLargeException is thrown as I only have one input element and grouping should only produce one element iterable. I have opened a ticket with Google as well.

2
You set up discardingFiredPanes but you do not alter the trigger or allowed lateness, so this has no effect.Kenn Knowles
From the code, objectsTuple is not used to obtain the following transform, it is strange because graphs show otherwise. Also, you mentioned two GroupByKey. As far as I understand, the first one is groupedObjects, the one returning duplicates, the other one with error is inside Reshuffle. Don't you think priceChangesTuple.get(StringUnmarshallFn.mainOutputTag) might be doing something that duplicates data?rsantiago
@rsantiago, thank you for pointing out the variable name. I had to mask some of the details to hide domain names, I missed to rename it. I have updated the variable name now. As I continued my research, I found stage P27 is actually first groupBy itself. It was little tricky to find this out. I am adding additional findings as update on the questionPraveen Billampati

2 Answers

0
votes

GroupByKey groups by key and window. Without a trigger, it outputs just one element per key and window, which is also at most 1 element per input element.

If you are seeing any other behavior it may be a bug and you can report it. You will probably need to provide more steps to reproduce the issue, including example data and the entire runnable pipeline.

0
votes

Since in the UPDATE you clarified that there are not duplicates, instead somehow dummy records are being added (what is really strange), this old thread reports similar issue and the answer is interesting since points out to a protobuf serialization issue caused by grouping a very large amount of data in a single window.

I recommend using the available troubleshooting steps (e.g. 1 or 2) to identify in which part of the code the issue is starting. For example, I'm still think that new StringUnmarshallFn() could be performing tasks that contribute to generate the dummy records. You might want to implement counters in your steps to try to identify how many records each step generates.

If you don't find a solution, the outstanding option is contact GCP Support and maybe they can figure it out.