We need to group PubSub messages by one of the fields from messages. We used fixed window of 15mins to group these messages.
When run on data flow, the GroupByKey used for messages grouping is introducing too many duplicate elements, another GroupByKey at far end of pipeline is failing with 'KeyCommitTooLargeException: Commit request for stage P27 and key abc#123 has size 225337153 which is more than the limit of..'
I have gone through the below link and found the suggestion was to use Reshuffle but Reshuffle has GroupByKey internally. Why is GroupByKey in beam pipeline duplicating elements (when run on Google Dataflow)?
My pipeline code:
PCollection<String> messages = getReadPubSubSubscription(options, pipeline);
PCollection<String> windowedMessages = messages
.apply(
Window
.<String>into(
FixedWindows.of(Duration.standardMinutes(15)))
.discardingFiredPanes());
PCollectionTuple objectsTuple = windowedMessages
.apply(
"UnmarshalStrings",
ParDo
.of(new StringUnmarshallFn())
.withOutputTags(
StringUnmarshallFn.mainOutputTag,
TupleTagList.of(StringUnmarshallFn.deadLetterTag)));
PCollection<KV<String, Iterable<ABCObject>>> groupedObjects =
objectsTuple.get(StringUnmarshallFn.mainOutputTag)
.apply(
"GroupByObjects",
GroupByKey.<String, ABCObject>create());
PCollection results = groupedObjects
.apply(
"FetchForEachKey",
ParDo.of(SomeFn())).get(SomeFn.tag)
.apply(
"Reshuffle",
Reshuffle.viaRandomKey());
results.apply(...)
...
PubSub is not duplicating messages for sure and there are no additional failures, GroupByKey is creating these duplicates, is something wrong with the Windowing I am using?
One observation is GroupBy is producing same no of elements as the next step produce. I am attaching two screenshots one for GroupByKey and Other For Fetch Function.
UPDATE After additional analysis
Stage P27 is actually the first GroupByKey which is outputting many elements than expected. I can't see them as duplicates of actual output element as all these million elements are not processed by next Fetch step. I am not sure if these are some dummy elements introduced by dataflow or wrong metric from dataflow.
I am still analyzing further on why this KeyCommitTooLargeException is thrown as I only have one input element and grouping should only produce one element iterable. I have opened a ticket with Google as well.
discardingFiredPanes
but you do not alter the trigger or allowed lateness, so this has no effect. – Kenn KnowlesobjectsTuple
is not used to obtain the following transform, it is strange because graphs show otherwise. Also, you mentioned two GroupByKey. As far as I understand, the first one isgroupedObjects
, the one returning duplicates, the other one with error is inside Reshuffle. Don't you thinkpriceChangesTuple.get(StringUnmarshallFn.mainOutputTag)
might be doing something that duplicates data? – rsantiago