0
votes

I have streaming job which listens to events, does operations on them using CEP.

Flow is

stream = source
           .assignTimestampsAndWatermarks(...)
           .filter(...);

 CEP
   .pattern(stream.keysBy(e-> e.getId()), pattern)
   .process(PattenMatchProcessFunction)
   .sink(...);

The keys are all short lived, and process function doesn't contains any state, to say state can be removed by setting ttl. Using EventTime characteristics

My question, how does flink handle the expired keys, would have any impact on the GC. If flink removes the keys itself then at what frequency does this happen.

Facing GC issues, job is getting stuck after deploying for 3 hours. Doing memory tuning, but want to eliminate this case.

1
Which state backend are you using?Yuval Itzchakov
FileSystem state backendardhani

1 Answers

0
votes

FsStateBackend will hold the state in-memory for your CEP operator.

What Flink does for CEP is it buffers the elements in a MapState[Long, List[T]] which maps a timestamp to all elements that arrived for that time. Once a watermark occurs, Flink will process the buffered events as follows:

// 1) get the queue of pending elements for the key and the corresponding NFA,
// 2) process the pending elements in event time order and custom comparator if exists by feeding them in the NFA
// 3) advance the time to the current watermark, so that expired patterns are discarded.
// 4) update the stored state for the key, by only storing the new NFA and MapState iff they have state to be used later.
// 5) update the last seen watermark.

Once the events have been processed, Flink will advance the watermark which will cause old entries in the state to be expired (you can see this inside NFA.advanceTime). This means that eviction of elements in your depend on how often watermarks are being created and pushed through in your stream.