1
votes

We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only one checkpoint at a time , using incremental and using rocksdb.

We run windows with lateness of 3 days , which means that we expect that no data in the checkpoint share folder will be kept after 3-4 days ,Still We see that there is data from more than that
e.g.
If today is 7/4 there are some files from the 2/4

Sometime we see checkpoints that we assume (due to the fact that its index number is not in synch) that it belongs to a job that crushed and the checkpoint was not used to restore the job

My questions are

  • Why do we see data that is older from lateness configuration
  • How do I know that the files belong to a valid checkpoint and not a checkpoint of a crushed job - so we can delete those files

enter image description here

1

1 Answers

2
votes

After investigating and with the assistance of Yun Tang (apache-flink-user-mailing-list)
I created the following code
metadataPath - path to _metadata file that is in the checkpoint/savepoint folder
This was tested on flink version 1.6.3

        DataInputStream in = new DataInputStream(new FileInputStream(metadataPath));
        final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, CheckpointTool.class.getClassLoader());

        final Set<String> pathSharedFromMetadata = savepoint.getOperatorStates().stream()
                .flatMap(operatorState -> operatorState.getSubtaskStates().values().stream()
                        .flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream()
                                .flatMap(keyedStateHandle -> Stream.concat(((IncrementalKeyedStateHandle) keyedStateHandle).getSharedState().values().stream(),
                                        ((IncrementalKeyedStateHandle) keyedStateHandle).getPrivateState().values().stream())
                                        .map(streamStateHandle -> {
                                            String name = null;
                                            if (streamStateHandle instanceof FileStateHandle) {
                                                name = ((FileStateHandle) streamStateHandle).getFilePath().getName();
                                            } else {
                                                final String handleName = ((ByteStreamStateHandle) streamStateHandle).getHandleName();
                                                name = new File(handleName).getName();
                                            }
                                            return name.trim();

                                        })
                                )
                        )
                )
                .collect(Collectors.toSet());
        System.out.println("pathSharedFromMetadata:" + pathSharedFromMetadata);