2
votes

We have a pipeline that reads data from a Pub/Sub GCP Topic. We want to create 15 minute aggregations (integrals and means) from said data. For this we create a FixedWindow followed by a creation/groupby/deletion of a dummy key, that allow us to have all the messages in window in a list, and then creating those aggregations with a DoFn custom class that uses pandas to perform the processing. Finally we write the results into a InfluxDB database.

(
    p
    | 'read_telemetry_from_pubsub' >> ReadFromPubSub(topic=PUBSUB_TOPIC)
    | 'window_telemetry' >> beam.WindowInto(beam.window.FixedWindows(WINDOW_SIZE))
    | 'format_telemetry_for_influx' >> beam.ParDo(FormatInfluxDB())
    | "add_dummy_key" >> beam.Map(lambda elem: (None, elem))
    | "groupby_dummy_key" >> beam.GroupByKey()
    | "delete_dummy_key" >> beam.MapTuple(lambda _, val: val)
    | "aggregate" >> beam.ParDo(Aggregator())
    | "write_processed_messages_to_influx" >> beam.ParDo(WriteToInfluxDB())
)

This is the class that aggregates the list containing all the messages in a 15 minute window:

class Aggregator(beam.DoFn):
    def process(self, elements):
        # parsing the message list into a pandas DataFrame
        # some preprocessing and agregation steps
        # returns a list with json messages
        return [aggregated_values]

We use GCP Pub/Sub emulator to test this code locally and it works just fine. However when we deployed to GCP Dataflow it doesn't emit any results and no errors are found in the logs. Additionally, we see that data freshness grows indefinitely.

We believe we are missing some triggering functions, but we are not sure if this is the correct approach for doing this kind of aggregations because it emits results locally but not when deployed. When we use a trigger different from the default one, locally we don't have emissions.

We have tried some triggering options (Repeatedly, AfterProcessingTime, AccumulationMode.DISCARDING, AfterWatermark) and another approach using Combine custom class, but we haven't result emission either.

NOTE: for a more detailed code, here is the complete code for the aggregating class.