Here is a simple flink application using a stateful mapper with an Integer
managed state. You can play with the checkpointEnable
, checkpointInterval
and checkpointMode
variables to see their effect:
public class CheckpointExample {
private static Logger LOG = LoggerFactory.getLogger(CheckpointExample.class);
private static final String KAFKA_BROKER = "localhost:9092";
private static final String KAFKA_INPUT_TOPIC = "input-topic";
private static final String KAFKA_GROUP_ID = "flink-stackoverflow-checkpointer";
private static final String CLASS_NAME = CheckpointExample.class.getSimpleName();
public static void main(String[] args) throws Exception {
// play with them
boolean checkpointEnable = false;
long checkpointInterval = 1000;
CheckpointingMode checkpointMode = CheckpointingMode.EXACTLY_ONCE;
// ----------------------------------------------------
LOG.info(CLASS_NAME + ": starting...");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// kafka source
// https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#kafka-consumer
Properties prop = new Properties();
prop.put("bootstrap.servers", KAFKA_BROKER);
prop.put("group.id", KAFKA_GROUP_ID);
prop.put("auto.offset.reset", "latest");
prop.put("enable.auto.commit", "false");
FlinkKafkaConsumer09<String> source = new FlinkKafkaConsumer09<>(
KAFKA_INPUT_TOPIC, new SimpleStringSchema(), prop);
// checkpoints
// internals: https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#checkpointing
// config: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/checkpointing.html
if (checkpointEnable) env.enableCheckpointing(checkpointInterval, checkpointMode);
env
.addSource(source)
.keyBy((any) -> 1)
.flatMap(new StatefulMapper())
.print();
env.execute(CLASS_NAME);
}
/* *****************************************************************
* Stateful mapper
* (cf. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html)
* ****************************************************************/
public static class StatefulMapper extends RichFlatMapFunction<String, String> {
private transient ValueState<Integer> state;
@Override
public void flatMap(String record, Collector<String> collector) throws Exception {
// access the state value
Integer currentState = state.value();
// update the counts
currentState += 1;
collector.collect(String.format("%s: (%s,%d)",
LocalDateTime.now().format(ISO_LOCAL_DATE_TIME), record, currentState));
// update the state
state.update(currentState);
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("CheckpointExample", TypeInformation.of(Integer.class), 0);
state = getRuntimeContext().getState(descriptor);
}
}
}
To be able to check the checkpoints, you need to start a cluster
. The easier way is to use the start-cluster.sh
script in the flink/bin
directory:
start-cluster.sh
Starting cluster.
[INFO] 1 instance(s) of jobmanager are already running on virusnest.
Starting jobmanager daemon on host virusnest.
Password:
Starting taskmanager daemon on host virusnest.
Now, package your app and submit it to flink:
mvn clean package
flink run target/flink-checkpoints-test.jar -c CheckpointExample
Create some data:
kafka-console-producer --broker-list localhost:9092 --topic input-topic
a
b
c
^D
The output should be available in flink/logs/flink-<user>-jobmanager-0-<host>.out
. For example:
tail -f flink/logs/flink-Derlin-jobmanager-0-virusnest.out
2017-03-17T08:21:51.249: (a,1)
2017-03-17T08:21:51.545: (b,2)
2017-03-17T08:21:52.363: (c,3)
To test the checkpoints, simply kill the taskmanager (this will emulate a failure), produce some data and start a new one:
# killing the taskmanager
ps -ef | grep -i taskmanager
kill <taskmanager PID>
# starting a new taskmanager
flink/bin/taskmanager.sh start
Note: when starting a new taskmanager, it will use another log file, namely flink/logs/flink-<user>-jobmanager-1-<host>.out
(notice the integer increment).