FlinkKafkaConsumer let's you consume data from one or more kafka topics.
The consumer to use depends on your kafka distribution.
FlinkKafkaConsumer08: uses the old SimpleConsumer API of Kafka. Offsets are handled by Flink and committed to zookeeper.FlinkKafkaConsumer09: uses the new Consumer API of Kafka, which handles offsets and rebalance automatically.FlinkKafkaProducer010: this connector supports Kafka messages with timestamps both for producing and consuming (useful for window operations).The binaries are not part of flink core, so you need to import them:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.${kafka.version}_2.10</artifactId>
  <version>RELEASE</version>
</dependency>
The constructor takes three arguments:
bootstrap.servers: a comma separated list of Kafka brokers in the form ip:port. For version 8, use zookeeper.connect (list of zookeeper servers) insteadgroup.id: the id of the consumer group (see kafka documentation for more details)In Java:
Properties properties = new Properties();
properties.put("group.id", "flink-kafka-example");
properties.put("bootstrap.servers", "localhost:9092");
DataStream<String> inputStream = env.addSource( 
        new FlinkKafkaConsumer09<>(
            kafkaInputTopic, new SimpleStringSchema(), properties));
In scala:
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
inputStream = env.addSource(
        new FlinkKafkaConsumer08[String](
            "topic", new SimpleStringSchema(), properties))
During development, you can use the kafka properties enable.auto.commit=false and auto.offset.reset=earliest to reconsume the same data everytime you launch your pogram.