1
votes

I have data in Kafka that looks like this:

{
  "account": "iOS", //Possible values: iOS, android, web, windows
  "events": [
    {
      "timestamp": "2017-07-03T20:19:35Z"
    }
  ]
}

The timestamp goes from 2017-07-03T20:19:35Z to 2017-07-03T20:22:30Z (about 3 minutes). I have this Flink program that ingests the data from the kafka topic above:

object TestWindow {
  def main(args: Array[String]) = {
    val props = new Properties()
    props.put(...)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    env.addSource(new FlinkKafkaConsumer010[String]("topic", new SimpleStringSchema(), props)).
      keyBy(jsonStr => {
        val jsonParser = new JsonParser()
        val jsonObject = jsonParser.parse(jsonStr).getAsJsonObject()
        jsonObject.get("account")
    }).window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .reduce({(v1, v2) =>
        println(v1 + " " + v2)
        ""
      })
    env.execute()
  }
}

I am having trouble understanding how windowing works with the keyBy operation.

I understand that the keyBy operation above will create different partitions containing elements of the same key. But I am confused on when the windows are created and how each partition is added to which window?

My guess is that as each JSON object arrives it will be keyed by the keyBy operation and then based on the event timeStamp of that JSON object it will create a window for 10 seconds?

So for example if a JSON object with a timestamp of "2017-07-03T20:19:35Z" arrives and has a account type of iOS then a keyed by partition for iOS will be created and a window will also be created for "2017-07-03T20:19:35Z" to "2017-07-03T20:19:45Z"? Or is a window created every 10 seconds regardless of the event timestamp that is from the JSON object from Kafka?

1

1 Answers

2
votes

Your assumption about keyBy is correct. keyBy partitions the stream on the defined key attribute(s) and windows are computed per key.

The TumblingEventTimeWindow that you are using in your example has fixed window borders, i.e., the borders do not depend on the timestamps of your data. A 10 seconds tumbling window will create windows from [00:00:00.000, 00:00:10.000), [00:00:10.000, 00:00:20.000) and so on. Records that arrive at the window operator will be assigned to the window that intersects with their timestamp. When the window is closed (the local time of the operator passes the end timestamp of the window), the result of the window is computed and emitted. Note that windows are only instantiated with the first record, i.e., empty windows do not trigger a computation and cannot emit data, as for example a zero count.

Other window types such as a session window have data-driven boundaries. In case of a session window, all records that are not more than some time interval apart from each other are grouped into a window.