I'm trying to read data from kafka topic into DataStream and register DataStream, after that use TableEnvironment.sqlQuery("SQL") to query the data, when TableEnvironment.execute() there is no error and no output.
public static void main(String[] args){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharateristic.EventTime);
env.enableCheckpointing(5000);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
FlinkKafkaConsumer<Person> consumer = new FlinkKafkaConsumer(
"topic",
new JSONDeserializer(),
Job.getKafkaProperties
);
consumer.setStartFromEarliest();
DataStream<Person> stream = env.addSource(consumer).fliter(x -> x.status != -1).assignTimestampAndWatermarks(new AssignerWithPeriodicWatermarks<Person>(){
long current = 0L;
final long expire = 1000L;
@Override
public Watermakr getCurrentWatermark(){
return new Watermark(current - expire);
}
@Override
public long extractTimestamp(Person person){
long timestamp = person.createTime;
current = Math.max(timestamp,current);
return timestamp;
}
});
//set createTime as rowtime
tableEnvironment.registerDataStream("Table_Person",stream,"name,age,sex,createTime.rowtime");
Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
tableEnvironment.toAppendStream(t,Types.Row(new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.LONG})).print();
tableEnvironment.execute("person-query");
}
when i execute,there was nothing print on console or throw any exceptions; but if i use fromCollection() as a source,the program will print something on the console; Can you please guide me to fix this?
dependencies:
- flink-streaming-java_2.11 version:1.9.0-csa1.0.0.0;
- flink-streaming-scala_2.11 version:1.9.0-csa1.0.0.0;
- flink-connector-kafka_2.11 version:1.9.0-csa1.0.0.0;
- flink-table-api-java-bridge_2.11 version:1.9.0-csa1.0.0.0;
- flink-table-planner_2.11 version:1.9.0-csa1.0.0.0;
kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic topic
. If that succeeds, then trystream.print()
. – David Anderson