0
votes

I am building a streaming app using Flink 1.3.2 with scala, my Flink app will monitor a folder and stream new files into pipeline. Each record in the file has a timestamp associated. I want to use this timestamp as the event time and build watermark using AssignerWithPeriodicWatermarks[T], my watermark generator looks like below:

class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[Activity] {
    val maxTimeLag = 6 * 3600000L // 6 hours
    override def extractTimestamp(element: Activity, previousElementTimestamp: Long): Long = {
    val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX")
        val timestampString = element.getTimestamp
    }
    override def getCurrentWatermark(): Watermark = {
      new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
  }

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(10000L)
val stream = env.readFile(inputformart, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 100)

val activity = stream
      .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
      .map { line =>
        new tuple.Tuple2(line.id, line.count)
      }.keyBy(0).addSink(...)

However, since my folder has some old data there, I don't want to process them. And the timestamp of records in older file are > 6 hours, which should be older than watermark. However, when I start running it, I can still see some initial output been created. I was wondering how the initial value of watermark been set up, is it the before the first interval or after? It might be I misunderstand something here but need some advice.

1

1 Answers

1
votes

There are no operators in the pipeline you've shown that care about time -- no windowing, no ProcessFunction timers -- so every stream element will pass thru unimpeded and be processed. If your goal is to skip elements that are late you'll need to introduce something that (somehow) compares event timestamps to the current watermark.

You could do this by introducing a step between the keyBy and sink, like this:

...
.keyBy(0)
.process(new DropLateEvents())
.addSink(...)

public static class DropLateEvents extends ProcessFunction<...> {
    @Override
    public void processElement(... event, Context context, Collector<...> out) throws Exception {
        TimerService timerService = context.timerService();
        if (context.timestamp() > timerService.currentWatermark()) {
           out.collect(event);
        }
    }
}

Having done this, your question about the initial watermark becomes relevant. With periodic watermarks, the initial watermark is Long.MIN_VALUE, so nothing will be considered late until the first watermark is emitted, which will happen after 10 seconds of operation (given how you've set the auto-watermarking interval).

The relevant code is here if you want to see how periodic watermarks are generated in more detail.

If you want to avoid processing late elements during the first 10 seconds, you could simply forget about using event time and watermarking entirely, and simply modify the processElement method shown above to compare the event timestamps to System.currentTimeMillis() - maxTimeLag rather than to the current watermark. Another solution would be to use punctuated watermarking, and emit a watermark with the very first event.

Or even more simply, you could detect and drop late events in a flatMap or filter, since you are defining lateness relative to System.currentTimeMillis() rather than to the watermarks.