1
votes

I want to create an event time clock for my events in Apache flink. I am doing it in following way

public class TimeStampAssigner implements AssignerWithPeriodicWatermarks<Tuple2<String, String>> {


    private final long maxOutOfOrderness = 0; // 3.5 

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(Tuple2<String, String> element, long previousElementTimestamp) {

        currentMaxTimestamp = new  Date().getTime();

        return currentMaxTimestamp;
    }



    @Override
    public Watermark getCurrentWatermark() {

        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);


    }

}

Please check the above code and tell if I am doing it correctly. After the event time and watermark assignment i want to process the stream in process function in which i will be collecting the stream data for 10 minutes for different keys.

2

2 Answers

1
votes

No, this is not an appropriate implementation. An event time timestamp should be deterministic (i.e., reproducible), and it should be based on data in the event stream. If instead you are going to use Date().getTime, then you are more or less using processing time.

Typically when doing event time processing your events will have a timestamp field, and the timestamp extractor will return the value of this field.

The implementation you've shown will lose most of the benefits that come from working with event time, such as the ability to reprocess historic data in order to reproduce historic results.

0
votes

Your implementation is implementing ingestion time to the Flink system and not the event time. If you consume from Kafka, for example, previousElementTimestamp should normally point to the time where the event has been produced to the Kafka (if nothing else is said by the Kafka producer), which would make your streaming processing reproducible.

If you want to implement event time processing in Flink you should rather use some timestamps associated with your element. Which could be or inside the element itself (which makes sense for time-series) or stored in the Kafka and available under the previousElementTimestamp.

About maxOutOfOrderness you also probably want to consider Flink's side output feature which makes possible to get the late elements after the window creation and update your Flink job's output.

If you consume from Kafka and want just simple with some data loss event time processing implementation go with AscendingTimestampExtractor. There are some potential problems with a AscendingTimestampExtractor which can appear in case your data are not ordered within the partition or you apply this extractor after the operator and not directly after the KafkaSource. For the robust industrial use-case you should rather implement Watermark Ingestion into the persistent log storage as mentioned in the Google DataFlow model.