FlinkKafkaConsumer
let's you consume data from one or more kafka topics.
The consumer to use depends on your kafka distribution.
FlinkKafkaConsumer08
: uses the old SimpleConsumer
API of Kafka. Offsets are handled by Flink and committed to zookeeper.FlinkKafkaConsumer09
: uses the new Consumer API of Kafka, which handles offsets and rebalance automatically.FlinkKafkaProducer010
: this connector supports Kafka messages with timestamps both for producing and consuming (useful for window operations).The binaries are not part of flink core, so you need to import them:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.${kafka.version}_2.10</artifactId>
<version>RELEASE</version>
</dependency>
The constructor takes three arguments:
bootstrap.servers
: a comma separated list of Kafka brokers in the form ip:port. For version 8, use zookeeper.connect
(list of zookeeper servers) insteadgroup.id
: the id of the consumer group (see kafka documentation for more details)In Java:
Properties properties = new Properties();
properties.put("group.id", "flink-kafka-example");
properties.put("bootstrap.servers", "localhost:9092");
DataStream<String> inputStream = env.addSource(
new FlinkKafkaConsumer09<>(
kafkaInputTopic, new SimpleStringSchema(), properties));
In scala:
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
inputStream = env.addSource(
new FlinkKafkaConsumer08[String](
"topic", new SimpleStringSchema(), properties))
During development, you can use the kafka properties enable.auto.commit=false
and auto.offset.reset=earliest
to reconsume the same data everytime you launch your pogram.