SimpleStringSchema: SimpleStringSchema deserializes the message as a string. In case your messages have keys, the latter will be ignored.
new FlinkKafkaConsumer09<>(kafkaInputTopic, new SimpleStringSchema(), prop);
JSONDeserializationSchema
JSONDeserializationSchema deserializes json-formatted messages using jackson and returns a stream of com.fasterxml.jackson.databind.node.ObjectNode objects. You can then use the .get("property") method to access fields. Once again, keys are ignored.
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONDeserializationSchema(), prop);
JSONKeyValueDeserializationSchema
JSONKeyValueDeserializationSchema is very similar to the previous one, but deals with messages with json-encoded keys AND values.
boolean fetchMetadata = true;
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONKeyValueDeserializationSchema(fetchMetadata), properties);
The ObjectNode returned contains the following fields:
key: all the fields present in the keyvalue: all the message fieldsmetadata: exposes the offset, partition and topic of the message (pass true to the constructor in order to fetch metadata as well).For example:
kafka-console-producer --broker-list localhost:9092 --topic json-topic \
--property parse.key=true \
--property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}
^C
Will be decoded as:
{
"key":{"keyField1":1,"keyField2":2},
"value":{"valueField1":1,"valueField2":{"foo":"bar"}},
"metadata":{
"offset":43,
"topic":"json-topic",
"partition":0
}
}