In a Flink job, I want to delete state in memory 24 hours after it is constructed. I checked this post and set up state time to live(ttl), but as it mentioned in this article, state removal is lazy/passive, which may lead to memory leak.
For example, after 23 hours 57 minutes I got the last message for key ('USA', 'Male', 2018), and after that no more message comes for this key. Then I won't be able to invoke the function and the ttl of the state for this key ('USA', 'Male', 2018), then it will be kept in memory forever.
This article mentioned using a timer:
The idea is to register a timer with the TTL per state value and access. When the timer elapses, the state can be cleared if no other state access happened since the timer was registered.
but I am not able to figure out how to do that.
I'm thinking about using ProcessFunction which has a onTimer() method. My plan is to register a ProcessingTimeTimer in its open() method, and delete the state in onTimer(), but I don't know if this timer is also passively triggered, meaning it won't fire if there no invocation of the ProcessFunction even after 24 hours.