I'm using Kafka Streams to process time series data. One use case is to aggregate data on an hourly bases for each sensor (sensor ID is the message key in topic test).
I have written a pipeline which groups by key (sensor ID) and then counts the readings for every hour.
The problem is that there are some duplicate messages in the test topic (same sensor ID and timestamp). I want to only consider the latest message.
Is there something in the Streams DSL API to accomplish this?
meterDataStream
.groupByKey()
.count(
TimeWindows
.of(TimeUnit.HOURS.toMillis(1))
.until(TimeUnit.HOURS.toMillis(1)),
"counts")
.foreach((key, value) => {
val start = epochMillistoDate(key.window().start())
val end = epochMillistoDate(key.window().end())
logger.info(s"$start - $end\t->$value")
})