everyone, I have a kafka topic source, I group it by a 1 minute window. What I want to do in that window is to create new columns with Window Function as in SQL, for example I want to use
- SUM(amount) OVER(PARTITION BY
- COUNT(user) OVER(PARTITION BY
- ROW_NUMBER() OVER(PARTITION BY
Can I use DataStream functions for these operations? or
How can I operate my kafka data to convert it to DataTable and use sqlQuery?
Destination is another kafka topic.
val stream = senv
.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))
I've tried to do this
val tableA = tableEnv.fromDataStream(stream, 'user, 'product, 'amount)
but I get the following error back
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.
test data
1,"beer",3
1,"beer",1
2,"beer",3
3,"diaper",4
4,"diaper",1
5,"diaper",5
6,"rubber",2
Query example
SELECT
user, product, amount,
COUNT(user) OVER(PARTITION BY product) AS count_product
FROM table;
expected performance
1,"beer",3,3
1,"beer",1,3
2,"beer",3,3
3,"diaper",4,3
4,"diaper",1,3
5,"diaper",5,3
6,"rubber",2,1
Too many fields referenced from an atomic type
– damjad