apache-flink Consume data from Kafka KafkaConsumer example


FlinkKafkaConsumer let's you consume data from one or more kafka topics.


The consumer to use depends on your kafka distribution.

  • FlinkKafkaConsumer08: uses the old SimpleConsumer API of Kafka. Offsets are handled by Flink and committed to zookeeper.
  • FlinkKafkaConsumer09: uses the new Consumer API of Kafka, which handles offsets and rebalance automatically.
  • FlinkKafkaProducer010: this connector supports Kafka messages with timestamps both for producing and consuming (useful for window operations).


The binaries are not part of flink core, so you need to import them:


The constructor takes three arguments:

  • one or more topic to read from
  • a deserialization schema telling Flink how to interpret/decode the messages
  • kafka consumer configuration properties. Those are the same as a "regular" kafka consumer. The minimum required are:
    • bootstrap.servers: a comma separated list of Kafka brokers in the form ip:port. For version 8, use zookeeper.connect (list of zookeeper servers) instead
    • group.id: the id of the consumer group (see kafka documentation for more details)

In Java:

Properties properties = new Properties();
properties.put("group.id", "flink-kafka-example");
properties.put("bootstrap.servers", "localhost:9092");

DataStream<String> inputStream = env.addSource( 
        new FlinkKafkaConsumer09<>(
            kafkaInputTopic, new SimpleStringSchema(), properties));

In scala:

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

inputStream = env.addSource(
        new FlinkKafkaConsumer08[String](
            "topic", new SimpleStringSchema(), properties))

During development, you can use the kafka properties enable.auto.commit=false and auto.offset.reset=earliest to reconsume the same data everytime you launch your pogram.