I'm reading from a Kafka cluster in a Flink streaming app. After getting the source stream i want to aggregate events by a composite key and a timeEvent tumbling window and then write result to a table. The problem is that after applying my aggregateFunction that just counts number of clicks by clientId i don't find the way to get the key of each output record since the api returns an instance of accumulated result but not the corresponding key.
DataStream<Event> stream = environment.addSource(mySource)
stream.keyBy(new KeySelector<Event,Integer>() {
public Integer getKey(Event event) { return event.getClientId(); })
.window(TumblingEventTimeWindows.of(Time.minutes(1))).aggregate(new MyAggregateFunction)
How do i get the key that i specified before? I did not inject key of the input events in the accumulator as i felt i wouldn't be nice.