
I'm trying to read data from kafka topic into DataStream and register DataStream, after that use TableEnvironment.sqlQuery("SQL") to query the data, when TableEnvironment.execute() there is no error and no output.

public static void main(String[] args){
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
   FlinkKafkaConsumer<Person> consumer = new FlinkKafkaConsumer(
                                                                 new JSONDeserializer(),

  DataStream<Person> stream = env.addSource(consumer).fliter(x -> x.status != -1).assignTimestampAndWatermarks(new AssignerWithPeriodicWatermarks<Person>(){
      long current = 0L;
      final long expire = 1000L;

      public Watermakr getCurrentWatermark(){
         return new Watermark(current - expire);

      public long extractTimestamp(Person person){
         long timestamp = person.createTime;
         current = Math.max(timestamp,current);
         return timestamp;
  //set createTime as rowtime
  Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
  tableEnvironment.toAppendStream(t,Types.Row(new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.LONG})).print();

when i execute,there was nothing print on console or throw any exceptions; but if i use fromCollection() as a source,the program will print something on the console; Can you please guide me to fix this?


  1. flink-streaming-java_2.11 version:1.9.0-csa1.0.0.0;
  2. flink-streaming-scala_2.11 version:1.9.0-csa1.0.0.0;
  3. flink-connector-kafka_2.11 version:1.9.0-csa1.0.0.0;
  4. flink-table-api-java-bridge_2.11 version:1.9.0-csa1.0.0.0;
  5. flink-table-planner_2.11 version:1.9.0-csa1.0.0.0;
I set an breakpoint on return TriggerResult.FIRE; in Element(Object element,long timestamp,TimeWindow window,TriggerContext ctx) at EventTimeTrigger.class , but never been hitIvan Barrios
Maybe narrow down where the problem is, by first checking if there's actually data in the Kafka topic. Something like kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic topic. If that succeeds, then try stream.print().David Anderson
By the way, enableCheckpointing is misspelled.David Anderson
I set an breakpoint on .fliter(x -> x.status != -1) , also i set an breakpoint on TumblingEventTimeWindows.assignWindows(Object element,long timestamp,WindowAssignerContext context),both of x and param:element has valueIvan Barrios

1 Answers


In the code where you convert the SQL query's result back to a DataStream, you need to pass res rather than t to toAppendStream. (I can't see how the code you've posted will even compile — where is t declared?) And I think you should be able to do this

Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");

rather than bothering with the TypeInformation.