0
votes

I have made a pipeline for reading data from pubsub and streaming it to BigQuery. Before the data is streamed it is preprocessed to remove duplicates. This is done by windowing into fixed windows and grouping them by an id. The pipeline works fine, until after some hours it suddenly stops. There are nothing in the logs, no error messages or anything. And according to the job metrics the data is stops at the stage GroupByKey/MergeBuckets. Before that point everything works, but it sends no data forth. And as seen in Metrics from the job, it seems like a hard cutoff.

Here is the code for the grouping step:

class Transform(PTransform):

def expand(self, pcoll):
    return (
            pcoll
            | "Extract data" >> ParDo(ExtractData())
            | "Window into Fixed Intervals" >> WindowInto(window.FixedWindows(10))
            | "Make dummy key" >> Map(lambda elem: (elem["sensor_id"], elem))
            | "Group by dummy key" >> GroupByKey()
            | "Remove dummy key" >> MapTuple(lambda _, elem: elem)
    )

The ExtractData function transforms a the message from a json-string to a dict.

1

1 Answers

0
votes

I suspect that timestamps of messages read from PubSub are setup such that a particular Window is not triggered for a long time. Note that Beam uses event time triggers by default and by default triggers are not fired early (only fired when watermark reaches the Window boundary). So when your windows will be triggered depends on the timestamp of events read from PubSub. If you need to trigger data at regular 10 minute intervals, consider setting a processing time based trigger.