I'm trying to aggregate (per key) a streaming data source in Apache Beam (via Scio) using a stateful DoFn (using @ProcessElement
with @StateId
ValueState
elements). I thought this would be most appropriate for the problem I'm trying to solve. The requirements are:
- for a given key, records are aggregated (essentially summed) across all time - I don't care about previously computed aggregates, just the most recent
- keys may be evicted from the state (
state.clear()
) based on certain conditions that I control - Every 5 minutes, regardless if any new keys were seen, all keys that haven't been evicted from the state should be outputted
Given that this is a streaming pipeline and will be running indefinitely, using a combinePerKey
over a global window with accumulating fired panes seems like it will continue to increase its memory footprint and the amount of data it needs to run over time, so I'd like to avoid it. Additionally, when testing this out, (maybe as expected) it simply appends the newly computed aggregates to the output along with the historical input, rather than using the latest value for each key.
My thought was that using a StatefulDoFn would simply allow me to output all of the global state up until now(), but it seems this isn't a trivial solution. I've seen hintings at using timers to artificially execute callbacks for this, as well as potentially using a slowly growing side input map (How to solve Duplicate values exception when I create PCollectionView<Map<String,String>>) and somehow flushing this, but this would essentially require iterating over all values in the map rather than joining on it.
I feel like I might be overlooking something simple to get this working. I'm relatively new to many concepts of windowing and timers in Beam, looking for any advice on how to solve this. Thanks!