I am in the process of implementing a proof-of-concept stream processing system using Apache Flink 1.6.0 and am storing a list of received events, partitioned by key, in a ListState. (Don't worry about why I am doing this, just work with me here.) I have a StateTtlConfig set on the corresponding ListStateDescriptor. Per the documentation:
- "All state collection types support per-entry TTLs. This means that list elements and map entries expire independently."
- "Currently, expired values are only removed when they are read out explicitly, e.g. by calling
ValueState.value()."
Question 1
Which of the following constitutes a read of the ListState:
- Requesting the iterator but not using it -
myListState.get();. - Actually using the iterator -
for (MyItem i : myListState.get()) { ... }
Question 2
What does "per-entry TTL" actually mean? Specifically, what I'm asking about is the following:
Assume I have a specific instance of ListState<Character>. The descriptor has a TTL of 10 seconds. I insert a 'a'. Two seconds later, I insert 'b'. Nine seconds later I insert 'c'. If I iterate over this ListState, which items will be returned?
In other words:
ListState<Character> ls = getRuntimeContext().getListState(myDescriptor);
ls.add('a');
// ...two seconds later...
ls.add('b');
// ...nine seconds later...
ls.add('c');
// Does this iterate over 'a', 'b', 'c'
// or just 'b' and 'c'?
for (Character myChar : ls.get()) { ... }