I am trying to do custom aggregation on a structured streaming with event time windowing.
First I have tried to use #Aggregator interface (typed-UDAF) with the .agg function, something like :
val aggregatedDataset = streamDataset
.select($"id",$"time", $"eventType", $"eventValue"))
.groupBy(window($"time", "1 hour"), $"id").agg(CustomAggregator("time","eventType","eventValue").toColumn.as("aggregation"))
Yet this aggregation (in reduce function) is only working on the new input element, not the whole group
So I am trying to use the GroupState function (mapGroupsWithState, flapMapGroupWithState), or even just mapGroups function (without the state) to perform my aggregation
But my groupBy operation returns RelationalGroupedDataset and I need a KeyValueGroupedDataset to use map functions. groupByKey does not work with windowing.
How can I manage to do a custom aggregation with the structured streaming and timed event?
Thanks!