0
votes

everyone, I have a kafka topic source, I group it by a 1 minute window. What I want to do in that window is to create new columns with Window Function as in SQL, for example I want to use

  • SUM(amount) OVER(PARTITION BY
  • COUNT(user) OVER(PARTITION BY
  • ROW_NUMBER() OVER(PARTITION BY

Can I use DataStream functions for these operations? or

How can I operate my kafka data to convert it to DataTable and use sqlQuery?

Destination is another kafka topic.

    val stream = senv
      .addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))

I've tried to do this

val tableA = tableEnv.fromDataStream(stream, 'user, 'product, 'amount)

but I get the following error back

Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.

test data

1,"beer",3
1,"beer",1
2,"beer",3
3,"diaper",4
4,"diaper",1
5,"diaper",5
6,"rubber",2

Query example

    SELECT
     user, product, amount,
     COUNT(user) OVER(PARTITION BY product) AS count_product
   FROM table;

expected performance

1,"beer",3,3
1,"beer",1,3
2,"beer",3,3
3,"diaper",4,3
4,"diaper",1,3
5,"diaper",5,3
6,"rubber",2,1
1
please post one data sample from Kafka.damjad
Also post the SQL query you want to achieve.damjad
Your stream is of String type and you are trying to access fields from it. This explains your exception. Too many fields referenced from an atomic typedamjad
hi, the query is: SELECT user, product, amount, COUNT(user) OVER(PARTITION BY product) AS count_product FROM table;Danieledu
How do I apply the scheme so I can convert it to a table?Danieledu

1 Answers

1
votes

You need to parse the string into fields and then rename them afterwards.

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val stream = env.fromElements("1,beer,3",
 "1,beer,1","2,beer,3","3,diaper,4","4,diaper,1","5,diaper,5","6,rubber,2");

val parsed = stream.map(x=> {
 val arr = x.split(",")
 (arr(0).toInt, arr(1), arr(2).toInt)
})

val tableA = tEnv.fromDataStream(parsed, $"_1" as "user", $"_2" as "product", $"_3" as "amount")

// example query
val result = tEnv.sqlQuery(s"SELECT user, product, amount from $tableA")

val rs = result.toAppendStream[(Int, String, Int)]

rs.print()

I'm not sure how can we implement the desired window function in Flink SQL. Alternatively, it can be implemented in simple Flink as follows:

parsed.keyBy(x => x._2) // key by product id.
      .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
      .process(new ProcessWindowFunction[
        (Int, String, Int), (Int, String, Int, Int), String, TimeWindow
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, Int)],
                             out: Collector[(Int, String, Int, Int)]): Unit = {
          val lst = elements.toList
          lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
        }
      })
      .print()