0
votes

everyone, I want to use flink time window in StreamTableEnvironment.

I have previously used the timeWindow(Time.seconds()) function with a dataStream that comes from a kafka topic. For external issues I am converting this DataStream to DataTable and applying a SQL query with sqlQuery().

I want to do x second time window aggregations with SQL and then send it to another kafka topic

Data source:

    val stream = senv
      .addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))

example of previous aggregation:

    val windowCounts = stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))

Current DataTable:

    val tableA = tableEnv.fromDataStream(parsed, 'user, 'product, 'amount)

In this part there should be a query that makes an aggregation each X time

    val result = tableEnv.sqlQuery(
          s"SELECT * FROM $tableA WHERE amount > 2".stripMargin)

more or less my aggregation will be count(y) OVER(PARTITION BY x) Thank you!

1

1 Answers

1
votes

Ververica's training for Flink SQL will help you with this. In includes some exercises/examples that cover just this kind of query in the section on Querying Dynamic Tables with SQL.

You'll have to establish the source of timing information for each event, which can be either processing time or event time, after which the query corresponding to stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5)) will be something like this:

SELECT
  x,
  TUMBLE_END(timestamp, INTERVAL '5' SECOND) AS t,
  COUNT(*) AS cnt
FROM Events
GROUP BY
  x, TUMBLE(timestamp, INTERVAL '5' SECOND);

For details on how to work with time attributes, see the Introduction to Time Attributes.

And for more detailed documentation on windowing with Flink SQL see the docs on Group Windows.