0
votes

I am using flink 1.12.0. Trying to convert a data stream into a table A and running the sql query on the tableA to aggregate over a window as below.I am using f2 column as its a timestamp data type field .

    EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", BOOTSTRAP_SERVER);
    props.setProperty("schema.registry.url", xxx);
    props.setProperty("group.id", "test");
    props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());

    props.put("client.id", "flink-kafka-example");

    FlinkKafkaConsumer<PlaybackListening> kafkaConsumer = new FlinkKafkaConsumer<>(
            "test-topic",
            ConfluentRegistryAvroDeserializationSchema.forSpecific(
                    Avrotest.class, prodSchemaRegistryURL),
            props);

    DataStreamSource<Avrotest> stream =
            env.addSource(kafkaConsumer);
    Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2"));
    Table result =
            tEnv.sqlQuery("SELECT f0, sum(f1),f2 FROM "
                    + tableA + " GROUP BY TUMBLE(f2, INTERVAL '1' HOUR) ,f1" );

    
    tEnv.toAppendStream(result,user.class).print();

    env.execute("Flink kafka test");
}

When i execute above code,i get

Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(6) encountered. at org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:50) at org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:81) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)

2

2 Answers

0
votes

In order to do using the table API to perform event-time windowing on your datastream, you'll need to first assign timestamps and watermarks. You should do this before calling fromDataStream.

With Kafka, it's generally best to call assignTimestampsAndWatermarks directly on the FlinkKafkaConsumer. See the watermark docs, kafka connector docs, and Flink SQL docs for more info.

0
votes

3 steps:

  1. First assign assignTimestampsAndWatermarks

You have several types of strategies.

For example:

 WatermarkStrategy<Row> customTime = WatermarkStrategy
                .<Row>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((event, timestamp) -> (long) event.getField("f2"));
  1. In your source assign what you declare in step 1:
env.addSource().assignTimestampsAndWatermarks(customTime)
  1. declare the table, and set rowtime for the timestamp field:
Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2").rowtime());