apache-flink Checkpointing Testing checkpoints


The code

Here is a simple flink application using a stateful mapper with an Integer managed state. You can play with the checkpointEnable, checkpointInterval and checkpointMode variables to see their effect:

public class CheckpointExample {

    private static Logger LOG = LoggerFactory.getLogger(CheckpointExample.class);
    private static final String KAFKA_BROKER = "localhost:9092";
    private static final String KAFKA_INPUT_TOPIC = "input-topic";
    private static final String KAFKA_GROUP_ID = "flink-stackoverflow-checkpointer";
    private static final String CLASS_NAME = CheckpointExample.class.getSimpleName();

    public static void main(String[] args) throws Exception {

        // play with them
        boolean checkpointEnable = false;
        long checkpointInterval = 1000;
        CheckpointingMode checkpointMode = CheckpointingMode.EXACTLY_ONCE;

        // ----------------------------------------------------

        LOG.info(CLASS_NAME + ": starting...");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // kafka source
        // https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#kafka-consumer
        Properties prop = new Properties();
        prop.put("bootstrap.servers", KAFKA_BROKER);
        prop.put("group.id", KAFKA_GROUP_ID);
        prop.put("auto.offset.reset", "latest");
        prop.put("enable.auto.commit", "false");

        FlinkKafkaConsumer09<String> source = new FlinkKafkaConsumer09<>(
                KAFKA_INPUT_TOPIC, new SimpleStringSchema(), prop);

        // checkpoints
        // internals: https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#checkpointing
        // config: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/checkpointing.html
        if (checkpointEnable) env.enableCheckpointing(checkpointInterval, checkpointMode);

                .keyBy((any) -> 1)
                .flatMap(new StatefulMapper())


        /* *****************************************************************
         * Stateful mapper
         * (cf. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html)
         * ****************************************************************/

    public static class StatefulMapper extends RichFlatMapFunction<String, String> {
        private transient ValueState<Integer> state;

        public void flatMap(String record, Collector<String> collector) throws Exception {
            // access the state value
            Integer currentState = state.value();

            // update the counts
            currentState += 1;
            collector.collect(String.format("%s: (%s,%d)",
                    LocalDateTime.now().format(ISO_LOCAL_DATE_TIME), record, currentState));
            // update the state

        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Integer> descriptor =
                    new ValueStateDescriptor<>("CheckpointExample", TypeInformation.of(Integer.class), 0);
            state = getRuntimeContext().getState(descriptor);

Running the example and simulating failure

To be able to check the checkpoints, you need to start a cluster. The easier way is to use the start-cluster.sh script in the flink/bin directory:

Starting cluster.
[INFO] 1 instance(s) of jobmanager are already running on virusnest.
Starting jobmanager daemon on host virusnest.
Starting taskmanager daemon on host virusnest.

Now, package your app and submit it to flink:

mvn clean package
flink run target/flink-checkpoints-test.jar -c CheckpointExample

Create some data:

kafka-console-producer --broker-list localhost:9092 --topic input-topic 

The output should be available in flink/logs/flink-<user>-jobmanager-0-<host>.out. For example:

tail -f flink/logs/flink-Derlin-jobmanager-0-virusnest.out
2017-03-17T08:21:51.249: (a,1)
2017-03-17T08:21:51.545: (b,2)
2017-03-17T08:21:52.363: (c,3)

To test the checkpoints, simply kill the taskmanager (this will emulate a failure), produce some data and start a new one:

# killing the taskmanager
ps -ef | grep -i taskmanager
kill <taskmanager PID>

# starting a new taskmanager
flink/bin/taskmanager.sh start

Note: when starting a new taskmanager, it will use another log file, namely flink/logs/flink-<user>-jobmanager-1-<host>.out (notice the integer increment).

What to expect

  • checkpoints disabled: if you produce data during the failure, they will be definitely lost. But surprisingly enough, the counters will be right !
  • checkpoints enabled: no data loss anymore (and correct counters).
  • checkpoints with at-least-once mode: you may see duplicates, especially if you set a checkpoint interval to a high number and kill the taskmanager multiple times