Before 1.2, the only way to persist state/retain a checkpoint after a job termination/cancellation/persistant failure was through a savepoint, which is triggered manually. Version 1.2 introduced persistent checkpoints.
Persistent checkpoints behave very much like regular periodic checkpoints except the following differences:
It is thus very similar to savepoints; in fact, savepoints are just externalized checkpoints with a bit more information.
Important note: At the moment, Flink's checkpoint coordinator only retains the last successfully completed checkpoint. This means that whenever a new checkpoint completes then the last completed checkpoint will be discarded. This also applies to externalized checkpoints.
Where the metadata about [externalized] checkpoints are stored is configured in flink-conf.yaml
(and cannot be overriden through code):
# path to the externalized checkpoints
state.checkpoints.dir: file:///tmp/flink-backend/ext-checkpoints
Note that this directory only contains the checkpoint metadata required to restore the checkpoint. The actual checkpoint files are still stored in their configured directory (i.e. state.bachend.fs.checkpointdir
property).
You need to explicitly enable external checkpoints in the code using the getCheckpointConfig()
method of the streaming environment:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable regular checkpoints
env.enableCheckpointing(5000); // every 5 sec.
// enable externalized checkpoints
env.getCheckpointConfig()
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
The available ExternalizedCheckpointCleanup
modes are:
RETAIN_ON_CANCELLATION
: the last checkpoint and its metadata are kept on job cancellation; it is your responsibility to clean up afterwards.DELETE_ON_CANCELLATION
: the last checkpoint is deleted upon cancellation, meaning it is only available if the application fails.To resume from an externalized checkpoint, use the savepoint syntax. For example:
flink run -s /tmp/flink-backend/ext-checkpoints/savepoint-02d0cf7e02ea app.jar