2
votes

In a WindowAssigner, an element gets assigned to one or more TimeWindow instances. In case of a sliding event time window, this happens in SlidingEventTimeWindows#assignWindows1.

In case of a window with size=5 and slide=1, an element with timestamp=0 gets assigned into the following windows:

  1. Window(start=0, end=5)
  2. Window(start=-1, end=4)
  3. Window(start=-2, end=3)
  4. Window(start=-3, end=2)
  5. Window(start=-4, end=1)

In one picture:

                            +-> Beginning of time
                            |
                            |
+----------------------------------------------+
|     size = 5              +--+ element       |
|    slide = 1              |                  |
|                           v                  |
| t=[ 0,5[ Window 1         XXXXX              |
| t=[-1,4[ Window 2        XXXXX               |
| t=[-2,3[ Window 3       XXXXX                |
| t=[-3,2[ Window 4      XXXXX                 |
| t=[-4,1[ Window 5     XXXXX                  |
|                                              |
| time(-4 to +4)        ----                   |
|                       432101234              |
+---------------------------+------------------+
                            |
                            |
                            |
                            +

Is there a way to tell Flink that there is a beginning of time and before, there are no windows? If not, where to start looking to change that? In the above case, Flink should have only one window (t=[4,8[ Window 1) for the first element. Like this:

                            +-> Beginning of time
                            |
                            |
+-----------------------------------------------+
|     size = 5              +--+ element        |
|    slide = 1              |                   |
|                           v                   |
| t=[ 0,5[ Window 1         XXXXX               |
| t=[ 1,6[ Window 2          XXXXX              |
| t=[ 2,7[ Window 3           XXXXX             |
| t=[ 3,8[ Window 4            XXXXX            |
| t=[ 4,9[ Window 5             XXXXX           |
|                                               |
| time(-4 to +8)        ----                    |
|                       4321012345678           |
+---------------------------+-------------------+
                            |
                            |
                            |
                            +

This will have no more effect once the number of windows reaches and exceeds window size. Then, in the above case, all elements are inside of 5 Windows.


Footnotes:

  1. org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows#assignWindows
2

2 Answers

2
votes

At the moment there is no way to specify the valid time interval of a Flink job. This might also be a little bit problematic given that you might want to apply your job on historic data as well.

What you could do, though, is to filter windows which start before the beginning of time out manually:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val startTime = 1
val windowLength = 2
val slide = 1

val input = env.fromElements((1,1), (2,2), (3,3))
               .assignAscendingTimestamps(x => x._2)

val windowed = input
      .timeWindowAll(Time.milliseconds(windowLength), Time.milliseconds(slide))
      .apply{ (window, iterable, collector: Collector[Int]) =>
         if (window.getStart >= startTime) {
           collector.collect(iterable.map(_._1).reduce(_ + _))
         } else {
           // discard early windows
         }
       }

windowed.print()

env.execute()
0
votes

I might found better workaround for this issue. The idea is to set watermark to the point in future far enough so there will be enough data for your windows. Early windows will still be there, but they will be discarded.

Here is proof of concept for AssignerWithPeriodicWatermarks[T]:

  class WMG[T](wait: Long) extends AssignerWithPeriodicWatermarks[T] {
    var t: Option[Long] = None
    var firstTime = true

    override def extractTimestamp(el: T, prevTs: Long): Long = {
      t = Some(prevTs)
      prevTs
    }

    override def getCurrentWatermark(): Watermark = (t, firstTime) match {
      case (None, _) => return null
      case (Some(v), false) => new Watermark(v)
      case (Some(v), true) => {
        firstTime = false
        new Watermark(v + wait)
      }
    }
  }
`wait` is the size of your first window. Seems to work correctly, but I don't understand flink enough to be sure.

Update: Unfortunately, it doesn't work (now I don't know why should it), there is always few keys in keyed stream with "early windows". So in the end I'm just filtering wrong windows with something like:

val s = (winSize/winStep).intValue
kstream.flatMapWithState((in: StreamOut, state: Option[Int]) =>      
  state match {
    case None    => (Seq(), Some(1))
    case Some(s) => (Seq(in), Some(s))
    case Some(v) => (Seq(), Some(v+1))
  })