apache-flink Consume data from Kafka Built-in deserialization schemas


SimpleStringSchema: SimpleStringSchema deserializes the message as a string. In case your messages have keys, the latter will be ignored.

new FlinkKafkaConsumer09<>(kafkaInputTopic, new SimpleStringSchema(), prop);


JSONDeserializationSchema deserializes json-formatted messages using jackson and returns a stream of com.fasterxml.jackson.databind.node.ObjectNode objects. You can then use the .get("property") method to access fields. Once again, keys are ignored.

new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONDeserializationSchema(), prop);


JSONKeyValueDeserializationSchema is very similar to the previous one, but deals with messages with json-encoded keys AND values.

boolean fetchMetadata = true;
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONKeyValueDeserializationSchema(fetchMetadata), properties);

The ObjectNode returned contains the following fields:

  • key: all the fields present in the key
  • value: all the message fields
  • (optional) metadata: exposes the offset, partition and topic of the message (pass true to the constructor in order to fetch metadata as well).

For example:

kafka-console-producer --broker-list localhost:9092 --topic json-topic \
    --property parse.key=true \
    --property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}

Will be decoded as: