0
votes

How can i use the Ingestion time characteristics in Apache flink. I know we need to set the environment time characteristics. But how can i collect the data with timestamps which can be referred as ingestion time. Currently when i am using it, it is processing the window based on system clock time. I want to do the processing based on the time at which data enters the flink environment.

A little code extract which may help to understand it clearly :

Time characteristics for environment :

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

Window time :

keyedEvents.timeWindow(Time.minutes(5))

Collection in source :

ctx.collect(monSourceData);

If the data collection starts at let say 11:03, I want to end it at 11:08 i.e. for 5 minutes. But it stops at 11:05 ( somehow behaving like processing time ).

Thanks in advance for your help.

1

1 Answers

2
votes

Tumbling and sliding windows in Flink are always aligned to the clock (either the event time clock defined by the events and watermarks, or the system clock); time windows are not aligned to first event. So if you have windows that are 5 minutes long, there will be a window for the interval from 11:00 to 11:05, for example, regardless of the TimeCharacteristic.

Tumbling windows do, however, take an optional offset parameter that can be used to shift this alignment. So you could specify TumblingEventTimeWindows.of(Time.minutes(5), Time.minutes(3)), for example to shift the intervals by 3 minutes.