0
votes

I am new to Beam/Dataflow and am trying to figure out if it is suited to this problem. I am trying to keep a running sum of which types of messages are currently backlogged in a queueing system. The system uses a monotonically increasing offset number to order messages: producers learn the number when the send a message, and consumers track the watermark offset as they process each message in FIFO order. This pipeline would have two inputs: counts from the producers and watermarks from the consumers.

The queue producer would regularly flush a batch of count metrics to Beam:

(type1, offset, count)
(type2, offset, count)
...

where the offset was the last offset the producer wrote for typeN, and count is how many typeN messages it enqueued in the current batch period.

The queue consumer will regularly send its latest consumed watermark offset. The effect this should have is to invalidate any counts that have an offset lower than this consumer watermark.

The output of the pipeline is the sum of all counts with a higher offset than the largest consumer watermark yet seen, grouped by message type. (snapshotted every 5 minutes or so.)

(Of course there would be 100k message "types", hundreds of producer servers, occasional 2-hour periods where the consumer doesn't report an advancing watermark, etc.)

Is this doable? That this pipeline would need to maintain and scan an unbounded-ish history of count records is the part that seems maybe unsuited to Beam.

1
For each key (message) you would like to have an output which counts how many messages are in the queue since the last seen 'watermark' ? - Reza Rokni
Yeah, for each type of message seen in the queueing system, how many of that type are still in there? (how many have not been effectively marked processed by a high enough watermark.) - David Grant

1 Answers

1
votes

One possible approach would be to model this as two timeseries (left , right) where you want to match left.timestamp <= right.timestamp. You can do this using the State and Timer API.

In order to achieve this unbounded, you will need to be working within a GlobalWindow. Important note in the Global Window there is no expiry of the state, so you will need to make sure to do Garbage Collection on your left and right streams. Also data will arrive in the onprocess unordered, so you will need to make use of Event Time timers to do the actual work.

Very roughly:

onProcess(){
Store data in BagState.
Setup Event time timer to go off 
}

OnTimer(){
Do your buiss logic.
}

This is a lot easier with Apache Beam > 2.24.0 as OrderedListState has been added.

Although the timeseries use case is different from the one in this question, this talk from the 2019 Beam summit also has some pointers (but does not make use of OrderedListState, which was not available at the time);

State and Timer API and Timeseries