0
votes

Source: Kinesis data stream

Sink: Elasticesearch

For both using AWS services.

I am facing an issue with the windowing function of flink. My job looks like this

DataStream<TrackingData> input = ...;
input.keyBy(e -> e.getArea())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new MyReduceFunction(), new MyProcessWindowFunction())
                .addSink(<elasticsearch sink>);
 private static class MyReduceFunction implements ReduceFunction<TrackingData> {
        @Override
        public TrackingData reduce(TrackingData trackingData, TrackingData t1) throws Exception {
            trackingData.setVideoDuration(trackingData.getVideoDuration() + t1.getVideoDuration());
            return trackingData;
        }
    }
private static class MyProcessWindowFunction extends ProcessWindowFunction<TrackingData, TrackingData, String, TimeWindow> {
        public void process(String key,
                            Context context,
                            Iterable<TrackingData> in,
                            Collector<TrackingData> out) {

            TrackingData trackingIn = in.iterator().next();

            Long videoDuration =0l;
            for (TrackingData t: in) {
                videoDuration += t.getVideoDuration();
            }
            trackingIn.setVideoDuration(videoDuration);
            out.collect(trackingIn);
        }
    }

sample event :

{"area":"sessions","userId":4450,"date":"2021-12-03T11:00:00","videoDuration":5} 

What I do here is from the kinesis stream I got these events in a large amount I want to sum videoDuration for every 10 seconds of window then I want to store this single event into elasticsearch.

In Kinesis there can be 10,000 events per second. I don't want to store all 10,000 events in elasticsearch i just want to store only one event for every 10 seconds.

The issue is when I send an event to this job it quickly processes this event and directly sinks into elasticsearch but I want to achieve : till every 10 seconds I want events videoDuration time to be incremented and after 10 seconds only one event to be store in elasticearch.

How can I achieve this?