I have an Apache Beam stream job (that is running on GCP Dataflow) where I am generating many individual operational kpi count objects (each of these objects represents a count of 1), windowing them for 5 minutes or so and then counting them using the Combine.perKey transform. Once they are counted, I then send them out to a remote managed service that maintains these metrics and provides tools to visualize them and set alerts.
This is what the operational metric count objects looks like:
public class OperationalReportSingleInput implements Serializable {
private LinkedHashMap<String, String> metricLabels;
private String metricLabelKey;
}
These Operational Metric Count objects are aggregated/counted based on their "metric labels". The metricLabelKey field is simply a string representation of the metricLabels LinkedHashMap and is used during the GroupByKey step of the Combine.perKey transform.
Here are all the steps associated with this entire process. You can see I first window all of the OperationalReportSingleInput objects for around 5 minutes, once the window is released I convert each OperationalReportSingleInput to a key/value of metricLabelString -> OperationalReportSingleInput, I then count all the metrics per key and then send them out to the remote managed server:
public static void aggregateAndSendMetricsToMetricsService(PCollection<OperationalReportSingleInput> individualCountObjects,
JobConfig jobConfig) {
individualCountObjects
.apply("Window that aggregates counts for " + jobConfig.getMetricCollectionWindow() + " seconds",
Window.<OperationalReportSingleInput>into(new GlobalWindows()).triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(jobConfig.getMetricCollectionWindow()))))
.discardingFiredPanes())
.apply("Creating key/values for aggregation purposes.", WithKeys.of(new generateSingleCountUniqueKey()))
.apply("Aggregating based on key.", Combine.perKey(new SingleCountAggregator()))
.apply("Adding dummy key so all requests can be grouped and then processed by single machine.",
ParDo.of(new ApplyDummyKey())).apply(GroupByKey.create())
.apply("Write metrics to Remote Metrics Service.", ParDo.of(new WriteMetricsToMetricsService(jobConfig)));
}
When I run this code in Dataflow (autoscaling enabled) with a few hundred thousand records it works as I expect it to. In that it aggregates all of the OperationalReportSingleInput count objects per metricLabelKey. So that, in the end, there is a single count per unique metricLabelKey. But when i run this with a few million records, that's when the issues arise. In that I will get multiple counts/outputs for a single unique metricLabelKey. It's almost as if that with a certain amount of scale the "Combine" transform's "mergeAccumulators" step malfunctions.
So instead of getting [{key1 -> 1500}, {key2 -> 456}] from the Combine.perKey step I will get something like [{key1 -> 1000}, {key1 -> 500}, {key2 -> 456}].
Has anyone every experienced anything like this or can spot anything I am doing incorrectly?
key1 -> value
mappings where part of? I have a similar situation where I get several early panes that are written to a remote service nearly at the same time altough I have a.plusDelayOf(10 seconds)
on the early trigger. I have the impression that there are too many early panes flying around. – Stefan