Checkpointing configuration is done in two steps. First, you need to choose a backend. Then, you can specify the interval and mode of the checkpoints in a per-application basis.
Available backends
Where the checkpoints are stored depends on the configured backend:
MemoryStateBackend
: in-memory state, backup to JobManager’s/ZooKeeper’s memory. Should be used only for minimal state (default to max. 5 MB, for storing Kafka offsets for example) or testing and local debugging.FsStateBackend
: the state is kept in-memory on the TaskManagers, and state snapshots (i.e. checkpoints) are stored in a file system (HDFS, DS3, local filesystem, ...). This setup is encouraged for large states or long windows and for high availability setups.RocksDBStateBackend
: holds in-flight data in a RocksDB database that is (per default) stored in the TaskManager data directories. Upon checkpointing, the whole RocksDB database is written to a file (like above). Compared to the FsStateBackend, it allows for larger states (limited only by the disk space vs the size of the taskmanager memory), but the throughput will be lower (data not always in memory, must be loaded from disc).Note that whatever the backend, metadata (number of checkpoints, localisation, etc.) are always stored in the jobmanager memory and checkpoints won't persist after the application termination/cancellation.
Specifying the backend
You specify the backend in your program's main
method using:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
Or set the default backend in flink/conf/flink-conf.yaml
:
# Supported backends:
# - jobmanager (MemoryStateBackend),
# - filesystem (FsStateBackend),
# - rocksdb (RocksDBStateBackend),
# - <class-name-of-factory>
state.backend: filesystem
# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# "S3://" for S3 file system.
state.backend.fs.checkpointdir: file:///tmp/flink-backend/checkpoints
Every application need to explicitly enable checkpoints:
long checkpointInterval = 5000; // every 5 seconds
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(checkpointInterval);
You can optionally specify a checkpoint mode. If not, it default to exactly once:
env.enableCheckpointing(checkpointInterval, CheckpointingMode.AT_LEAST_ONCE);
The checkpointing mode defines what consistency guarantees the system gives in the presence of failures. When checkpointing is activated, the data streams are replayed such that lost parts of the processing are repeated. With EXACTLY_ONCE
, the system draws checkpoints such that a recovery behaves as if the operators/functions see each record "exactly once". With AT_LEAST_ONCE
, the checkpoints are drawn in a simpler fashion that typically encounters some duplicates upon recovery.