0
votes

Thank you for taking the time to read this, I want to consult you experts on the Flink state TTL feature in Flink 1.8.0, after reading this, it is still vague to me.

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl

I want to make sure where the TTL feature is enabled, is it on the key field or the value field. Particularly, say I have a mapState structure like this:

mapState = Map[String,List[String]]
e.g. val mapState = Map("haha" -> List("foo","bar")) in Scala
where "haha" is the key of the mapState and List("foo","bar") is the value

If I were to set a TTL of 1 minute on the mapState through the StateTtlConfig, then immediately (less than 1 minute) write to one of the values in List, say "foo".

Then after 1 minute, when the TTL fires, does the key "haha" expires or the value "bar" expires?

In other words, if it was to expire on the key, my understanding is that the mapState would remain intact

mapState = Map("haha" -> List("foo","bar"))

because writing to the value "foo" would reset the TTL on the key, thus the whole mapState stays the same

Another scenario is that if it was to expire on the value, the mapState would become

mapState = Map("haha" -> List("foo"))

because the value "bar" would expire after 1 minute without being accessed.

Hopefully, I made the question clear, Thank you in advance for any form of help.

1

1 Answers

1
votes

The code for the accessing the state with TTL looks as follows:

<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
        SupplierWithException<TtlValue<V>, SE> getter,
        ThrowingConsumer<TtlValue<V>, CE> updater,
        ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
        TtlValue<V> ttlValue = getter.get();
        if (ttlValue == null) {
            return null;
        } else if (expired(ttlValue)) {
            stateClear.run();
            if (!returnExpired) {
                return null;
            }
        } else if (updateTsOnRead) {
            updater.accept(rewrapWithNewTs(ttlValue));
        }
        return ttlValue;
    }

The method that verifies wether the value is expired looks as below:

    static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
        return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
    }

Which basically means that in this case it will check the TTL for the whole list not for separate elements. So depending on the StateTtlConfig the whole list will expire or the whole list will not expire.

The TTL configs that are available are OnReadAndWrite and OnCreateAndWrite. So, basically in order for this to be consistent You need to use put() on MapState if You want to update your value list.