3
votes

I want to use a ProcessWindowFunction in my Apache Flink project. But I am getting some error when using process function, see below code snippet

The error is:

The method process(ProcessWindowFunction,R,Tuple,TimeWindow>) in the type WindowedStream,Tuple,TimeWindow> is not applicable for the arguments (JDBCExample.MyProcessWindows)

My program:

DataStream<Tuple2<String, JSONObject>> inputStream;

inputStream = env.addSource(new JsonArraySource());

inputStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process(new MyProcessWindows());

My ProcessWindowFunction:

private class MyProcessWindows 
  extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{

  public void process(
      String key, 
      Context context, 
      Iterable<Tuple2<String, JSONObject>> input, 
      Collector<Tuple2<String, String>> out) throws Exception 
  {
    ...
  }

}
2
can you double check the error message? It seem as if there is something missing in the signature of the process() method.Fabian Hueske

2 Answers

5
votes

The problem are probably the generic types of the ProcessWindowFunction.

You are referencing the key by position (keyBy(0)). Therefore, the compiler cannot infer its type (String) and you need to change the ProcessWindowFunction to:

private class MyProcessWindows 
    extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>

By replacing String by Tuple you have now a generic placeholder for keys that you can cast to Tuple1<String> when you need to access the key in the processElement() method:

public void process(
    Tuple key, 
    Context context, 
    Iterable<Tuple2<String, JSONObject>> input, 
    Collector<Tuple2<String, String>> out) throws Exception {

  String sKey = (String)((Tuple1)key).f0;
  ...
}

You can avoid the cast and use the proper type if you define a KeySelector<IN, KEY> function to extract the key, because the return type KEY of the KeySelector is known to the compiler.

3
votes

What Fabian said :) Using Tuple should work, but does involve some ugly type casts in your ProcessWindowFunction. Using a KeySelector is easy and results in cleaner code. E.g.

.keyBy(new KeySelector<Tuple2<String,JsonObject>, String>() {

    @Override
    public String getKey(Tuple2<String, JsonObject> in) throws Exception {
        return in.f0;
    }
})

The above then lets you define a ProcessWindowFunction like:

public class MyProcessWindows extends ProcessWindowFunction<Tuple2<String, JsonObject>, Tuple2<String, String>, String, TimeWindow> {