everyone, I want to use flink time window in StreamTableEnvironment.
I have previously used the timeWindow(Time.seconds()) function with a dataStream that comes from a kafka topic. For external issues I am converting this DataStream to DataTable and applying a SQL query with sqlQuery().
I want to do x second time window aggregations with SQL and then send it to another kafka topic
Data source:
val stream = senv
.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))
example of previous aggregation:
val windowCounts = stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))
Current DataTable:
val tableA = tableEnv.fromDataStream(parsed, 'user, 'product, 'amount)
In this part there should be a query that makes an aggregation each X time
val result = tableEnv.sqlQuery(
s"SELECT * FROM $tableA WHERE amount > 2".stripMargin)
more or less my aggregation will be count(y) OVER(PARTITION BY x) Thank you!