1
votes

I am a newbie to Flink. Sometimes there are cases where I want to do aggregation on a DataStream without needed to do a keyBy first. Why doesn't Flink support aggregation (sum, min, max, etc.) on a DataStream?

Thank you, Ahmed.

2
You can comment or upvote or accept the answer if you find this is useful. Else the question may not be useful for the future viewers.Jaya Ananthram

2 Answers

1
votes

Flink supports aggregation for the non-keyed stream, but you have to apply windowAll operation first then you can apply the aggregation. windowAll function will reduce the parallelism value to 1, meaning all the data will flow through the single task slot. This is by design because when you have more than one task slot, you can do the aggregation only for the stream of data that are available in that slot, not for across slot.

If your use case doesn't fit to use windowAll with parallelism one (ie-when you have more number of records from source), then you can try to apply the keyBy function then aggregation, this will get the aggregated result for the set of keys then again windowAll and finally aggregate function. This way you are doing aggregation by key in a different task slot then finally aggregation on the reduced data in a single task slot.

Following is an example for windowAll without keyBy operation,

environment.fromCollection(list)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.max(1)

Following is an example for windowAll after keyBy operation,

environment.fromCollection(list)
.keyBy(1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.maxBy(1)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.max(1)

Reference for the documentation - here

1
votes

With FLIP-134 the Flink community has decided to deprecate all of these relational methods from the DataStream API:

  • DataStream#project
  • Windowed/KeyedStream#sum,min,max,minBy,maxBy
  • DataStream#keyBy where the key specified with field name or index (including ConnectedStreams#keyBy)

The rationale behind this decision is that Table/SQL is a more complete and more performant relational API, and it already supports both batch and streaming. With these APIs you can easily perform global aggregations, without having to first do a keyBy or GROUP BY.

An example:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

SingleOutputStreamOperator<Integer> numbers = env.fromElements(0, 1, 1, 0, 3, 2);

Table data = tableEnv.fromDataStream(numbers, $("n"));

Table results = data.select($("n").max());

tableEnv
        .toRetractStream(results, Row.class)
        .print();

env.execute();