0
votes

I want to consume a Kafka topic into a table using Flink SQL, then convert it back to a DataStream.

Here is the SOURCE_DDL:

CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)

With Flink, I execute the DDL.

val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")

Then, I convert it into DataStream, and do downstream logic in the map(e => ...) part.

tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)

Inside the map(e => ...) part, I would like to access the column name, in this case, last_5_clicks. Why? Because I may have different sources with different columns names (such as last_10min_page_view), but I would like to reuse the code in map(e => ...).

Is there a way to do this? Thanks.

1
What's the point of that ?? I mean at the map stage You have already converted the data to tuples so You can access fields via their index e._1, so You don't really need to know the name of the column they were created from.Dominik Wosiński

1 Answers

1
votes

As of Flink 1.12, it can be accessed via Table.getSchema.getFieldNames. Since version 1.13, it can be accessed via Row.getFieldNames.