0
votes

Suppose we define a custom TableSource and TableSink, then how to integrate with SQL Client? Should I need to register the custom TableSource\Sink name like below manually? If not to register manually, how connector type custom1 map\related to custom1TableSource?

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
TableSource custom1TableSource  = new custom1TableSource ( );
tableEnv.registerTableSource("custom1", custom1TableSource);

Then configure the environment file below?

   tables:
      - name: custom1TableSource
        type: Source
        update-mode: append
        connector:
          property-version: 1
          type: ***custom1****

The source and sink I declared:

package com.abc;
public static class custom1TableSource implements StreamTableSource<Row>, DefinedRowtimeAttributes, DefinedProctimeAttribute {


package com.abc;
public static class custom1TableSink implements TableSink<Row>, AppendStreamTableSink<Row> {

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/sqlClient.html#configuration

Update:

After some checking from source Code I found Flink create the sink and source instance by implements StreamTableSinkFactory and the Factory created by the ServiceLoader ,however how to register the sink and source name to the TableSource and TabSink class?

2

2 Answers

0
votes

I got the answer,it need to override the requiredContext() , and write the connector.type manually, taking kafka as an example, it need to assign "kafka" to connector.type:

public abstract class KafkaTableSourceSinkFactoryBase implements
        StreamTableSourceFactory<Row>,
        StreamTableSinkFactory<Row> {

@Override
public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode
    **context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka**
    context.put(CONNECTOR_VERSION(), kafkaVersion()); // version
    context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility
    return context;
}
0
votes

Please have a look into the documentation for user-defined sources & sinks.

Both SQL Client and Table & SQL API use so called TableFactorys that are discovered using Java’s Service Provider Interfaces (SPI).