0
votes

I'm reading from a Kafka cluster in a Flink streaming app. After getting the source stream i want to aggregate events by a composite key and a timeEvent tumbling window and then write result to a table. The problem is that after applying my aggregateFunction that just counts number of clicks by clientId i don't find the way to get the key of each output record since the api returns an instance of accumulated result but not the corresponding key.

    DataStream<Event> stream = environment.addSource(mySource)

    stream.keyBy(new KeySelector<Event,Integer>() {
    public Integer getKey(Event event) { return event.getClientId(); })
.window(TumblingEventTimeWindows.of(Time.minutes(1))).aggregate(new MyAggregateFunction)

How do i get the key that i specified before? I did not inject key of the input events in the accumulator as i felt i wouldn't be nice.

1

1 Answers

1
votes

Rather than

.aggregate(new MyAggregateFunction)

you can use

.aggregate(new MyAggregateFunction, new MyProcessWindowFunction)

and in this case the process method of your ProcessWindowFunction will be passed the key, along with the pre-aggregated result of your AggregateFunction and a Context object with other potentially relevant info. See the section in the docs on ProcessWindowFunction with Incremental Aggregation for more details.