0
votes

I have a question regarding the new sourceSinks interface in Flink. I currently implement a new custom DynamicTableSinkFactory, DynamicTableSink, SinkFunction and OutputFormat. I use the JDBC Connector as an example and I use Scala.

All data that is fed into the sink has the type Row. So the OutputFormat serialisation is based on the Row Interface:

override def writeRecord(record: Row): Unit = {...}

As stated in the documentation:

records must be accepted as org.apache.flink.table.data.RowData. The framework provides runtime converters such that a sink can still work on common data structures and perform a conversion at the beginning.

The goal here is to keep the Row data structure and only convert Row into RowData when inserted into the SinkFunction. So in this way the rest of the code does not need to be changed.

class MySinkFunction(outputFormat: MyOutputFormat) extends RichSinkFunction[RowData] with CheckpointedFunction 

So the resulting question is: How to convert RowData into Row when using a DynamicTableSink and OutputFormat? Where should the conversion happen?

links: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc

Thanks.

1

1 Answers

0
votes

You can obtain a converter instance in the Context provided in org.apache.flink.table.connector.sink.DynamicTableSink#getSinkRuntimeProvider.

    // create type information for the DeserializationSchema
    final TypeInformation<RowData> producedTypeInfo =
            context.createTypeInformation(producedDataType);

    // most of the code in DeserializationSchema will not work on internal data structures
    // create a converter for conversion at the end
    final DataStructureConverter converter =
            context.createDataStructureConverter(producedDataType);

The instance is Java serializable and can be passed into the sink function. You should also call the converter.open() method in your sink function.

A more complex example can be found here (for sources but sinks work in a similar way). Have a look at SocketDynamicTableSource and ChangelogCsvFormat in the same package.