0
votes

We're working on calculating some max concurrent count for different type of events within a 1min tumbling time window.

These events like sensor data which was collected from our desktop agents on minute basis, however, some agent got a bad timestamp, say, it would be a timestamp even several hours later than now.

So, my question is how to handle/drop these events, currently I just apply filter(s => s.ct.getTime < now) predicate to exclude them.

My 1st question is, if I don't do this, I doubt this bad "future" event would trigger window calculation even the for those incomplete data window

And 2nd question is, do we have any better method to prevent this?

Thanks

1

1 Answers

2
votes

Interesting use case.

So first some background, then some solutions: Windows in flink do not fire based on timestamps but based on watermarks. There is a close connection between the two and often it's okay to treat them the same when it comes to window firing, but in this case, it's important to have this clear separation. So yes your doubt is probably valid, if you use a watermark generator that is strictly bound to the timestamp.

So with that in mind, you have a few options:

  • Filter invalid events (timestamp > now())
  • Adjust timestamp (timestamp = min(timestamp, now())) or by understanding why specific sensors are off (timezone issues?)
  • Use a more sophisticated watermark generator

I think the first two options are straight-forward and I'd personally would go for the 2. (fixing data is always good). Let's focus on the watermark generator.

There is basically no limit on how you generate watermarks - you can rely on your imagination. Here are some ideas:

  • Only advance watermarks, when you saw X events with a watermark greater than the current watermark.
  • Use some low pass filter = slow moving average.
  • Ignore events with timestamp > now() (so filter only for watermark generation).
  • ...

I'd be happy to hear which way you have chosen and I can help you further down.