To use a custom schema, all you need to do is implement one of the SerializationSchema
or DeserializationSchema
interface.
public class MyMessageSchema implements DeserializationSchema<MyMessage>, SerializationSchema<MyMessage> {
@Override
public MyMessage deserialize(byte[] bytes) throws IOException {
return MyMessage.fromString(new String(bytes));
}
@Override
public byte[] serialize(MyMessage myMessage) {
return myMessage.toString().getBytes();
}
@Override
public TypeInformation<MyMessage> getProducedType() {
return TypeExtractor.getForClass(MyMessage.class);
}
// Method to decide whether the element signals the end of the stream.
// If true is returned the element won't be emitted.
@Override
public boolean isEndOfStream(MyMessage myMessage) {
return false;
}
}
The MyMessage
class is defined as follow:
public class MyMessage{
public int id;
public String payload;
public Date timestamp;
public MyMessage(){}
public static MyMessage fromString( String s ){
String[] tokens = s.split( "," );
if(tokens.length != 3) throw new RuntimeException( "Invalid record: " + s );
try{
MyMessage message = new MyMessage();
message.id = Integer.parseInt(tokens[0]);
message.payload = tokens[1];
message.timestamp = new Date( Long.parseLong(tokens[0]));
return message;
}catch(NumberFormatException e){
throw new RuntimeException("Invalid record: " + s);
}
}
public String toString(){
return String.format("%d,%s,%dl", id, payload, timestamp.getTime());
}
}