I have made a pipeline for reading data from pubsub and streaming it to BigQuery. Before the data is streamed it is preprocessed to remove duplicates. This is done by windowing into fixed windows and grouping them by an id. The pipeline works fine, until after some hours it suddenly stops. There are nothing in the logs, no error messages or anything. And according to the job metrics the data is stops at the stage GroupByKey/MergeBuckets. Before that point everything works, but it sends no data forth. And as seen in Metrics from the job, it seems like a hard cutoff.
Here is the code for the grouping step:
class Transform(PTransform):
def expand(self, pcoll):
return (
pcoll
| "Extract data" >> ParDo(ExtractData())
| "Window into Fixed Intervals" >> WindowInto(window.FixedWindows(10))
| "Make dummy key" >> Map(lambda elem: (elem["sensor_id"], elem))
| "Group by dummy key" >> GroupByKey()
| "Remove dummy key" >> MapTuple(lambda _, elem: elem)
)
The ExtractData function transforms a the message from a json-string to a dict.