apache-flink Savepoints and externalized checkpoints Savepoints


Example

Configuration

The configuration is in the file flink/conf/flink-conf.yaml (under Mac OSX via homebrew, it is /usr/local/Cellar/apache-flink/1.1.3/libexec/conf/flink-conf.yaml).

Flink < 1.2: The configuration is very similar to the checkpoints configuration (topic available). The only difference is that it makes no sense to define an in-memory savepoint backend, since we need the savepoints to persist after Flink's shutdown.

# Supported backends: filesystem, <class-name-of-factory>
savepoints.state.backend: filesystem
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
# Note: must be accessible from the JobManager and all TaskManagers !
savepoints.state.backend.fs.checkpointdir: file:///tmp/flink-backend/savepoints

Note: If you don't specify a backend, the default backend is jobmanager, meaning that your savepoints will disappear once the cluster is shutdown. This is useful for debug only.

Flink 1.2+: as explained in this jira ticket, allowing a savepoint to be saved in the jobmanager's memory makes little sense. Since Flink 1.2, savepoints are necessarily stored into files. The above configuration has been replaced by:

# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints

Usage

Getting the job ID

To trigger a savepoint, all you need is the job ID of the application. The job ID is printed in the command line when you launch the job or can be retrieved later using flink list:

flink list
Retrieving JobManager.
Using address localhost/127.0.0.1:6123 to connect to JobManager.
------------------ Running/Restarting Jobs -------------------
17.03.2017 11:44:03 : 196b8ce6788d0554f524ba747c4ea54f : CheckpointExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Triggering a savepoint

To trigger a savepoint, use flink savepoint <jobID>:

flink savepoint 196b8ce6788d0554f524ba747c4ea54f
Retrieving JobManager.
Using address /127.0.0.1:6123 to connect to JobManager.
Triggering savepoint for job 196b8ce6788d0554f524ba747c4ea54f.
Waiting for response...
Savepoint completed. Path: file:/tmp/flink-backend/savepoints/savepoint-a40111f915fc
You can resume your program from this savepoint with the run command.

Note that you can also provide a target directory as a second argument, it will override the default one defined in flink/bin/flink-conf.yaml.

In Flink 1.2+, it is also possible to cancel a job AND do a savepoint at the same time, using the -s option:

flink cancel -s 196b8ce6788d0554f524ba747c4ea54f # use default savepoints dir
flink cancel -s hdfs:///savepoints 196b8ce6788d0554f524ba747c4ea54f # specify target dir

Note: it is possible to move a savepoint, but do not rename it !

Resuming from a savepoint

To resume from a specific savepoint, use the -s [savepoint-dir] option of the flink run command:

flink run -s /tmp/flink-backend/savepoints/savepoint-a40111f915fc app.jar

Specifying operator UID

To be able to resume from a savepoint after a code change, you must ensure that the new code uses the same UID for operator. To manually assign a UID, call the .uid(<name>) fonction right after the operator:

env
    .addSource(source)
    .uid(className + "-KafkaSource01")
    .rebalance()
    .keyBy((node) -> node.get("key").asInt())
    .flatMap(new StatefulMapper())
    .uid(className + "-StatefulMapper01")
    .print();