How can i use the Ingestion time characteristics in Apache flink. I know we need to set the environment time characteristics. But how can i collect the data with timestamps which can be referred as ingestion time. Currently when i am using it, it is processing the window based on system clock time. I want to do the processing based on the time at which data enters the flink environment.
A little code extract which may help to understand it clearly :
Time characteristics for environment :
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Window time :
keyedEvents.timeWindow(Time.minutes(5))
Collection in source :
ctx.collect(monSourceData);
If the data collection starts at let say 11:03, I want to end it at 11:08 i.e. for 5 minutes. But it stops at 11:05 ( somehow behaving like processing time ).
Thanks in advance for your help.