We're consuming multiple kafka topics but want to give precedence to some of them (~ Quality of Service).
According to what I've found online, the consensus is to not throttle in operators but in the source, more specifically the deserializer [1].
How can we access information about the state of the streaming environment (i.e. how far topics lag behind the current offset) in the source.
Currently, we plan to convert our whole setup into CoFlatMaps [2] and have a control stream that emits the current offset-lag for all topics - low precedence stream operators then sleep according to the lag of the high precedence streams.
How would you solve this problem? Tl;dr: Is there a way to share information across sources/deserializers of a taskmanager?