I see examples that convert a Flink Table object to a DataStream and run StreamExecutionEnvironment.execute.
how would I code + run a continuous query that writes to a Streaming Sink with the table API without converting to a DataStream.
It seems this must be possible, because otherwise what is the purpose of specifying streaming sink Table Connectors?
The Table API docs list continuous queries and dynamic tables, yet most of the actual Java APIs and code examples seem to only use the table API for batch.
EDIT: To show David Anderson what I'm trying, here are the three Flink SQL CREATE TABLE statements on top of analogous Derby SQL tables.
I see the JDBC table connector sink supports streaming, but am I not configuring this correctly? I don't see anything that I'm overlooking. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
FYI, when I get my toy example working, I am planning on using Kafka in production for input/output stream-like data and JDBC/SQL for the lookup table.
CREATE TABLE LookupTableFlink (
`lookup_key` STRING NOT NULL,
`lookup_value` STRING NOT NULL,
PRIMARY KEY (lookup_key) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'LookupTable'
),
CREATE TABLE IncomingEventsFlink (
`field_to_use_as_lookup_key` STRING NOT NULL,
`extra_field` INTEGER NOT NULL,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'IncomingEvents'
), jdbcUrl);
CREATE TABLE TransformedEventsFlink (
`field_to_use_as_lookup_key` STRING,
`extra_field` INTEGER,
`lookup_key` STRING,
`lookup_value` STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'TransformedEvents'
), jdbcUrl);
String sqlQuery =
"SELECT\n" +
" IncomingEventsFlink.field_to_use_as_lookup_key, IncomingEventsFlink.extra_field,\n" +
" LookupTableFlink.lookup_key, LookupTableFlink.lookup_value\n" +
"FROM IncomingEventsFlink\n" +
"LEFT JOIN LookupTableFlink FOR SYSTEM_TIME AS OF IncomingEventsFlink.proctime\n" +
"ON (IncomingEventsFlink.field_to_use_as_lookup_key = LookupTableFlink.lookup_key)\n";
Table joinQuery = tableEnv.sqlQuery(sqlQuery);
// This seems to run, return, and complete and doesn't seem to run in continuous/streaming mode.
TableResult tableResult = joinQuery.executeInsert(TransformedEventsFlink);