apache-flink Consume data from Kafka KafkaConsumer example

Help us to keep this website almost Ad Free! It takes only 10 seconds of your time:
> Step 1: Go view our video on YouTube: EF Core Bulk Extensions
> Step 2: And Like the video. BONUS: You can also share it!

Example

FlinkKafkaConsumer let's you consume data from one or more kafka topics.

versions

The consumer to use depends on your kafka distribution.

  • FlinkKafkaConsumer08: uses the old SimpleConsumer API of Kafka. Offsets are handled by Flink and committed to zookeeper.
  • FlinkKafkaConsumer09: uses the new Consumer API of Kafka, which handles offsets and rebalance automatically.
  • FlinkKafkaProducer010: this connector supports Kafka messages with timestamps both for producing and consuming (useful for window operations).

usage

The binaries are not part of flink core, so you need to import them:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.${kafka.version}_2.10</artifactId>
  <version>RELEASE</version>
</dependency>

The constructor takes three arguments:

  • one or more topic to read from
  • a deserialization schema telling Flink how to interpret/decode the messages
  • kafka consumer configuration properties. Those are the same as a "regular" kafka consumer. The minimum required are:
    • bootstrap.servers: a comma separated list of Kafka brokers in the form ip:port. For version 8, use zookeeper.connect (list of zookeeper servers) instead
    • group.id: the id of the consumer group (see kafka documentation for more details)

In Java:

Properties properties = new Properties();
properties.put("group.id", "flink-kafka-example");
properties.put("bootstrap.servers", "localhost:9092");

DataStream<String> inputStream = env.addSource( 
        new FlinkKafkaConsumer09<>(
            kafkaInputTopic, new SimpleStringSchema(), properties));

In scala:

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

inputStream = env.addSource(
        new FlinkKafkaConsumer08[String](
            "topic", new SimpleStringSchema(), properties))

During development, you can use the kafka properties enable.auto.commit=false and auto.offset.reset=earliest to reconsume the same data everytime you launch your pogram.



Got any apache-flink Question?