As explained in the docs,
With Flinkās checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where stored in the checkpoint.
The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
To use fault tolerant Kafka Consumers, you need to enable checkpointing at the execution environment using the enableCheckpointing
method:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5 seconds