0
votes

My requirement is to hold 30 days data into stream to given any day for processing. so first day when FLINK application will start, it will fetch 30 days data from database and will merge to current stream data. My challenge is - manage 30 days data window. If I create slidingwindow for 30 days with sliding time 1 day. something like

WatermarkStrategy<EventResponse> wmStrategy = WatermarkStrategy.<EventResponse>forBoundedOutOfOrderness(Duration.ofMillis(1))
                .withTimestampAssigner((eventResponse, l) -> eventResponse.getLocalDateTime().toEpochSecond(ZoneOffset.MAX));

        ds.assignTimestampsAndWatermarks(wmStrategy)
                .windowAll(SlidingEventTimeWindows.of(Time.days(30), Time.days(1)))
        .process(new ProcessAllWindowFunction<EventResponse, Object, TimeWindow>() {

            @Override
            public void process(Context context, Iterable<EventResponse> iterable, Collector<Object> collector) throws Exception {
    --- proccessing logic
}

in this case process() do not start processing immediately when first element of historical data is added. my assumption is ```a) by default first event will be part of first window and will be available for processing immediately. b) next day job will remove last 29th day data from window. is my assumption correct with that piece of code? thank you for your help on this.

1

1 Answers

1
votes

I don't think that Your assumptions are correct in this case. When You are using the TimeWindow with ProcessFunction it means that the function is able to process the data when the window is closed (in Your case after 30 days). In this case, slide in time window means that the second window will contain 29 days of the first window and 31st day which was not part of the first window.