0
votes

in aggregation to this question I'm still not having clear why the checkpoints of my Flink job grows and grows over time and at the moment, for about 7 days running, these checkpoints never gets the plateau. I'm using Flink 1.10 version at the moment, FS State Backend as my job cannot afford the latency costs of using RocksDB.

See the checkpoints evolve over 7 days: enter image description here Let's say that I have this configuration for the TTL of the states in all my stateful operators for one hour or maybe more than that and a day in one case:

public static final StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupFullSnapshot().build();

In my concern all the objects into the states will be cleaned up after the expires time and therefore the checkpoints size should be reduced, and as we expect more or less the same amount of data everyday.

In the other hand we have a traffic curve, which has more incoming data in some hours of the day, but late night the traffic goes down and all the objects into the states that expires should be cleaned up causing that the checkpoint size should be reduced not kept with the same size until the traffic goes up again.

Let's see this code sample of one use case:

DataStream<Event> stream = addSource(source);
KeyedStream<Event, String> keyedStream = stream.filter((FilterFunction<Event>) event ->
                    apply filters here;))
                    .name("Events filtered")
                    .keyBy(k -> k.rType.equals("something") ? k.id1 : k.id2);
keyedStream.flatMap(new MyFlatMapFunction())


public class MyFlatMapFunction extends RichFlatMapFunction<Event, Event>{
private final MapStateDescriptor<String, Event> descriptor = new MapStateDescriptor<>("prev_state", String.class, Event.class);
private MapState<String, Event> previousState;

@Override
    public void open(Configuration parameters) {
        /*ttlConfig described above*/
        descriptor.enableTimeToLive(ttlConfig);
        previousState = getRuntimeContext().getMapState(descriptor);
    }

@Override
    public void flatMap(Event event, Collector<Event> collector) throws Exception {
      final String key = event.rType.equals("something") ? event.id1 : event.id2;
      Event previous = previousState.get(key);
      if(previous != null){
        /*something done here*/
      }else /*something done here*/
        previousState.put(key, previous);
        collector.collect(previous);
 }
}

More or less these is the structure of the use cases, and some others that uses Windows(Time Window or Session Window)

Questions:

  • What am I doing wrong here?
  • Are the states cleaned up when they expires and this scenario which is the same of the rest of the use cases?
  • What can help me to fix the checkpoint size if they are working wrong?
  • Is this behaviour normal?

Kind regards!

1
Is the JVM heap growing in a similar fashion as well, or just the checkpoint sizes? - David Anderson
No, JVM Heap is fine. Which means that the GC is working as expected right? Thanks. - Alejandro Deulofeu
I'm unable to formulate a theory of your application that explains all of the facts you've shared. Something doesn't add up. Given that I don't understand the situation well enough, I hesitate to offer any advice. - David Anderson
There's an experiment you could do that might shed some light on what's going on with the checkpoints growing in size. If you restore (a copy of) the job from a checkpoint, and disabled the input(s), then after an hour the checkpoint size should drop to zero. - David Anderson
Allows me to ask you another question: My job consist in to read events from RabbitMQ -> Transformations -> Sink, I'm not using Restart Strategies because if my job fails SVC service will manage the automatic restart of the job and it will start from scratch. Starting from this operational mode, checkpoints makes no senses to me, because they are not been used to any restart in fail cases, and according to my understanding they are worthy if Flink application crashes and then restart from checkpoints, but as my job always start from scratch: Do I need the checkpoints in my scenario? - Alejandro Deulofeu

1 Answers

2
votes

In this stretch of code it appears that you are simply writing back the state that was already there, which only serves to reset the TTL timer. This might explain why the state isn't being expired.

Event previous = previousState.get(key);
if (previous != null) {
  /*something done here*/
} else
  previousState.put(key, previous);

It also appears that you should be using ValueState rather than MapState. ValueState effectively provides a sharded key/value store, where the keys are the keys used to partition the stream in the keyBy. MapState gives you a nested map for each key, rather than a single value. But since you are using the same key inside the flatMap that you used to key the stream originally, key-partitioned ValueState would appear to be all that you need.