SimpleStringSchema: SimpleStringSchema
deserializes the message as a string. In case your messages have keys, the latter will be ignored.
new FlinkKafkaConsumer09<>(kafkaInputTopic, new SimpleStringSchema(), prop);
JSONDeserializationSchema
JSONDeserializationSchema
deserializes json-formatted messages using jackson and returns a stream of com.fasterxml.jackson.databind.node.ObjectNode
objects. You can then use the .get("property")
method to access fields. Once again, keys are ignored.
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONDeserializationSchema(), prop);
JSONKeyValueDeserializationSchema
JSONKeyValueDeserializationSchema
is very similar to the previous one, but deals with messages with json-encoded keys AND values.
boolean fetchMetadata = true;
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONKeyValueDeserializationSchema(fetchMetadata), properties);
The ObjectNode
returned contains the following fields:
key
: all the fields present in the keyvalue
: all the message fieldsmetadata
: exposes the offset
, partition
and topic
of the message (pass true
to the constructor in order to fetch metadata as well).For example:
kafka-console-producer --broker-list localhost:9092 --topic json-topic \
--property parse.key=true \
--property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}
^C
Will be decoded as:
{
"key":{"keyField1":1,"keyField2":2},
"value":{"valueField1":1,"valueField2":{"foo":"bar"}},
"metadata":{
"offset":43,
"topic":"json-topic",
"partition":0
}
}