4
votes

I have a situation to do sliding count over large scale of messages using State and TimeService. The sliding size is one and the window size is larger than 10 hours. The problem I meet is the checkpointing takes a lot of time. In order to improve the performance we use the incremental checkpoints. But it is still slow when the system do the checkpoint. We figure out that the most of the time is used to serialize the timers which are used to clean data. We have a timer for each key and there are about 300M timers at all.

Any suggestion to solve this problem would be appreciated. Or we can do the count in another way? ———————————————————————————————————————————— I'd like to add some details to the situation. The sliding size is one event and the window size is more than 10 hours(There are about 300 events per second), we need to react on each event. So in this situation we did not use the windows provided by Flink. we use the keyed state to store the previous information instead. The timers is used in ProcessFunction to trigger the cleaning job of the old data. At last the number of the dinstinct keys is very large.

3
Could you provide a more detailled description? I tried to answer you, but it's difficult without more detailsdiegoreico
Please clarify the situation. The sliding size is "one" what? One hour, one minute, or one event? Into how many different windows is each event being assigned? How does the windowing relate to the timers in question (are you talking about the timers flink uses for timeWindows, or something in a ProcessFunction)? Are there actually 300M distinct keys?David Anderson
Thanks for your attentions. I add sone details to the situation. I hope that can clarify the question.Barry Bai

3 Answers

3
votes

I think this should work:

Dramatically reduce the number of keys Flink is working with from 300M down to 100K (for example), by effectively doing something like keyBy(key mod 100000). Your ProcessFunction can then use a MapState (where the keys are the original keys) to store whatever it needs.

MapStates have iterators, which you can use to periodically crawl each of these maps to expire old items. Stick to the principle of having only one timer per key (per uberkey, if you will), so that you only have 100K timers.

UPDATE:

Flink 1.6 included FLINK-9485, which allows timers to be checkpointed asynchronously, and to be stored in RocksDB. This makes it much more practical for Flink applications to have large numbers of timers.

0
votes

What about if instead of using timers you add an extra field to every element of your stream to store the current processing time or the arrival time? So once you want to clean old data from your stream, you just have to use a filter operator and check if the data it's old engouh to be deleted.

0
votes

Rather than registering a clearing timer on each event, how about you register a timer only once per some period e.g. once per 1 minute? You could register it only the first time a key is seen, plus refresh it in onTimer. Sth like:

new ProcessFunction<SongEvent, Object>() {

  ...

  @Override
  public void processElement(
      SongEvent songEvent,
      Context context,
      Collector<Object> collector) throws Exception {

    Boolean isTimerRegistered = state.value();
    if (isTimerRegistered != null && !isTimerRegistered) {
      context.timerService().registerProcessingTimeTimer(time);
      state.update(true);
    }

    // Standard processing


  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out)
      throws Exception {
    pruneElements(timestamp);

    if (!elements.isEmpty()) {
      ctx.timerService().registerProcessingTimeTimer(time);
    } else {
      state.clear();
    }
  }
}

Something similar is implemented for Flink SQL Over clause. You can have a look here