apache-flink Savepoints and externalized checkpoints Externalized checkpoints (Flink 1.2+)


Example

Before 1.2, the only way to persist state/retain a checkpoint after a job termination/cancellation/persistant failure was through a savepoint, which is triggered manually. Version 1.2 introduced persistent checkpoints.

Persistent checkpoints behave very much like regular periodic checkpoints except the following differences:

  1. They persist their meta data into a persistant storage (like savepoints).
  2. They are not discarded when the owning job fails permanently. Furthermore, they can be configured to not be discarded when the job is cancelled.

It is thus very similar to savepoints; in fact, savepoints are just externalized checkpoints with a bit more information.

Important note: At the moment, Flink's checkpoint coordinator only retains the last successfully completed checkpoint. This means that whenever a new checkpoint completes then the last completed checkpoint will be discarded. This also applies to externalized checkpoints.

Configuration

Where the metadata about [externalized] checkpoints are stored is configured in flink-conf.yaml (and cannot be overriden through code):

# path to the externalized checkpoints
state.checkpoints.dir: file:///tmp/flink-backend/ext-checkpoints

Note that this directory only contains the checkpoint metadata required to restore the checkpoint. The actual checkpoint files are still stored in their configured directory (i.e. state.bachend.fs.checkpointdir property).

Usage

You need to explicitly enable external checkpoints in the code using the getCheckpointConfig() method of the streaming environment:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();   
// enable regular checkpoints
env.enableCheckpointing(5000); // every 5 sec.
// enable externalized checkpoints
env.getCheckpointConfig()
    .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

The available ExternalizedCheckpointCleanup modes are:

  • RETAIN_ON_CANCELLATION: the last checkpoint and its metadata are kept on job cancellation; it is your responsibility to clean up afterwards.
  • DELETE_ON_CANCELLATION: the last checkpoint is deleted upon cancellation, meaning it is only available if the application fails.

To resume from an externalized checkpoint, use the savepoint syntax. For example:

flink run -s /tmp/flink-backend/ext-checkpoints/savepoint-02d0cf7e02ea app.jar