apache-flink Built-in deserialization schemas


Example

SimpleStringSchema: SimpleStringSchema deserializes the message as a string. In case your messages have keys, the latter will be ignored.

new FlinkKafkaConsumer09<>(kafkaInputTopic, new SimpleStringSchema(), prop);

JSONDeserializationSchema

JSONDeserializationSchema deserializes json-formatted messages using jackson and returns a stream of com.fasterxml.jackson.databind.node.ObjectNode objects. You can then use the .get("property") method to access fields. Once again, keys are ignored.

new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONDeserializationSchema(), prop);

JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema is very similar to the previous one, but deals with messages with json-encoded keys AND values.

boolean fetchMetadata = true;
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONKeyValueDeserializationSchema(fetchMetadata), properties);

The ObjectNode returned contains the following fields:

  • key: all the fields present in the key
  • value: all the message fields
  • (optional) metadata: exposes the offset, partition and topic of the message (pass true to the constructor in order to fetch metadata as well).

For example:

kafka-console-producer --broker-list localhost:9092 --topic json-topic \
    --property parse.key=true \
    --property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}
^C

Will be decoded as:

{
    "key":{"keyField1":1,"keyField2":2},
    "value":{"valueField1":1,"valueField2":{"foo":"bar"}},
    "metadata":{
        "offset":43,
        "topic":"json-topic",
        "partition":0
    }
}