1
votes

I have some question.

Based on the timestamp in the class, I would like to make a logic that excludes data that has entered N or more times in 1 minute.

UserData class has a timestamp variable.

class UserData{ 
      public Timestamp timestamp; 
      public String userId; 
    }

At first I tried to use a tumbling window.

SingleOutputStreamOperator<UserData> validStream =
                stream.keyBy((KeySelector<UserData, String>) value -> value.userId)
                        .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
                        .process(new ValidProcessWindow());

public class ValidProcessWindow extends ProcessWindowFunction<UserData, UserData, String, TimeWindow> {

    private int validCount = 10;

    @Override
    public void process(String key, Context context, Iterable<UserData> elements, Collector<UserData> out) throws Exception {
        int count = -1;
        for (UserData element : elements) {
            count++; // start is 0

            if (count >= validCount) // valid click count
            {
                continue;
            }
            
            out.collect(element);
        }
    }
}

However, the time calculation of the tumbling window is based on a fixed time, so it is not suitable regardless of the timestamp of the UserData class.

How to handle window on stream UserData class's timestamp base?

Thanks.

Additinal Information

I use code like this.

stream.assignTimestampsAndWatermarks(WatermarkStrategy.<UserData>forBoundedOutOfOrderness(Duration.ofSeconds(1))                
.withTimestampAssigner((event, timestamp) -> Timestamps.toMillis(event.timestamp))
.keyBy((KeySelector<UserData, String>) value -> value.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.process(new ValidProcessWindow());

I tried some test. 150 sample data. The timestamp of each data increased by 1 second. result is |1,2,3....59| |60,61....119| . I wait last 30 data. but is not processed. I expected |1,2,3....59| |60,61....119| |120...149|.

How can I get last other datas?

Self Answer

I found the cause. Because I use only 150 sample data.

If use event time at Flink can not progress if there are no elements to be processed.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html#idling-sources

So, I tested 150 sample data and dummy data. (dummy data timestamp of each data increased by 1 second).

I received correct data |1,2,3....59| |60,61....119| |120...149|.

Thank you for help.

1

1 Answers

0
votes

So as far as I understand Your problem, You should just use different Time Characteristic. Processing time is using the system time to calculate windows, You should use event time for Your application. You can find more info about proper usage of event time here.

EDIT: That's how flink works, there is no data to push watermark past 150, so window is not closed and thus no output. You can use custom trigger that will close the window even if watermark is not generated or inject some data to move the watermark.