I have a question regarding the new sourceSinks interface in Flink. I currently implement a new custom DynamicTableSinkFactory, DynamicTableSink, SinkFunction and OutputFormat. I use the JDBC Connector as an example and I use Scala.
All data that is fed into the sink has the type Row. So the OutputFormat serialisation is based on the Row Interface:
override def writeRecord(record: Row): Unit = {...}
As stated in the documentation:
records must be accepted as org.apache.flink.table.data.RowData. The framework provides runtime converters such that a sink can still work on common data structures and perform a conversion at the beginning.
The goal here is to keep the Row data structure and only convert Row into RowData when inserted into the SinkFunction. So in this way the rest of the code does not need to be changed.
class MySinkFunction(outputFormat: MyOutputFormat) extends RichSinkFunction[RowData] with CheckpointedFunction
So the resulting question is: How to convert RowData into Row when using a DynamicTableSink and OutputFormat? Where should the conversion happen?
links: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc
Thanks.