The structure of my Flink code is: get data in with kafka (topic_1_in) -> deserialize messages -> map -> manipulate the data -> get a POJO -> serialize message -> send data out with kafka (topic_1_out)
I'm now on the last stage where I would like to serialize my POJO. I have found the following example on the Flink website:
DataStream<String> stream = ...
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
stream.addSink(myProducer);
But I don't understand how to implement a serialization schema.
I also read different possibilities:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
But still, I'm a bit confused on how to convert my POJO to a string to feed the Kafka sink. The class is really simple so I assume that is quite straightforward.
public class POJO_block {
public Double id;
public Double tr_p;
public Integer size;
public Double last_info;
public Long millis_last;
private ArrayList<Tuple3<Integer, Integer, Integer>> list_val;
}
Any example would be really appreciated.
thanks