0
votes

I'm processing analytics hits in an Apache Beam pipeline written in python. I'm using FixedWindows of 10 minutes and I would like to trigger an alert (for example with Cloud Pub/Sub) when a window is empty. So far here's what I've done:

ten_min_windows = day_hits | '10MinutesWindows' >> beam.WindowInto(
    beam.window.FixedWindows(10 * 60))

ten_min_alerts = (ten_min_windows
    | 'CountTransactions10Min' >> beam.CombineGlobally(count_transactions).without_defaults()
    | 'KeepZeros10Min' >> beam.Filter(keep_zeros)
    | 'ConvertToAlerts10Min' >> beam.ParDo(ToAlert()))

count_transactions filters to only keep transaction hits, then returns the length of the resulting list. keep_zeros returns true if the resulting length is 0. The problem is, if the PCollection did not contain transaction hits, no length is returned at all, and I get an empty PCollection because of without defaults. It seems I cannot take out without defaults as it is not allowed when using non global windows.

I've seen this thread advising to add a dummy element to each window, then check that the count is more than one.

Is this the best solution or is there a better way ?

How can I do this, as I will need to have exactly one element per window ? Can I code this in the pipeline directly, or do I need to schedule a fake hit to be sent (for example through Cloud Pub/Sub) every 10 minutes ?

1

1 Answers

1
votes

You can use Metrics.counter in order to monitor the number of element processed in Stackdriver for example.

From there you can then set up alerting, based on your own rules, from your favorite monitoring tool.