1
votes

I am trying to figure out how to write my own WindowFunction but having issues, and I can not figure out why. The issue I am having is with the apply function, as it does not recognize MyWindowFunction as a valid input, so I can not compile. The data I am streaming contains (timestamp,x,y) where x and y are 0 and 1 for testing. extractTupleWithoutTs simply returns a tuple (x,y). I have been running the code with simple sum and reduce functions with success. Grateful for any help :) Using Flink 1.3

Imports:

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

Rest of the code:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999).assignTimestampsAndWatermarks(new TsExtractor)
val tuple = text.map( str => extractTupleWithoutTs(str))
val counts = tuple.keyBy(0).timeWindow(Time.seconds(5)).apply(new MyWindowFunction())
counts.print()
env.execute("Window Stream")

MyWindow function which is basically copy paste from example with changes of the types.

class MyWindowFunction extends WindowFunction[(Int, Int), Int, Int, TimeWindow] {
  def apply(key: Int, window: TimeWindow, input: Iterable[(Int, Int)], out: Collector[Int]): () = {
    var count = 0
    for (in <- input) {
      count = count + 1
    }
    out.collect(count)
  }
}
1

1 Answers

3
votes

The problem is the third type parameter of the WindowFunction, i.e., the type of the key. The key is declared with an index in the keyBy method (keyBy(0)). Therefore, the type of the key cannot be determined at compile time. The same problem arises, if you declare the key as a string, i.e., keyBy("f0").

There are two options to resolve this:

  1. Use a KeySelector function in keyBy to extract the key (something like keyBy(_._1)). The return type of the KeySelector function is known at compile time such that you can use a correctly typed WindowFunction with an Int key.
  2. Change the type of the third type parameter of the WindowFunction to org.apache.flink.api.java.tuple.Tuple, i.e., WindowFunction[(Int, Int), Int, org.apache.flink.api.java.tuple.Tuple, TimeWindow]. Tuple is a generic holder for the keys extracted by keyBy. In your case it will be a org.apache.flink.api.java.tuple.Tuple1. In WindowFunction.apply() you can cast Tuple to Tuple1 and access the key field by Tuple1.f0.