From a time-windowed, keyed stream I'd like to get a stream of the largest window seen so far (largest in terms of count of elements).
Currently I have the following code:
source
.keyBy(...)
.timeWindow(...)
.fold((DummyKey, 0)) { case ((_, current), key) => (key, current + 1) }
.keyBy(_ => ())
.maxBy(1)
The result of the fold is a stream of (key, count) elements - so from this stream, I want to get a stream of updates of the "key with highest count".
I then key by a constant (keyBy(_ => ()) - as this is a global operation), and use maxBy - and this almost works: I'm getting a stream of highest counts, but the current highest count is emitted for each element.
I think what I'm looking for is some kind of filter-with-previous value, which would only emit elements when the new value is different from the previous.
Is that possible in Flink currently?