1
votes

I'm using Kafka Streams to process time series data. One use case is to aggregate data on an hourly bases for each sensor (sensor ID is the message key in topic test).

I have written a pipeline which groups by key (sensor ID) and then counts the readings for every hour.

The problem is that there are some duplicate messages in the test topic (same sensor ID and timestamp). I want to only consider the latest message.

Is there something in the Streams DSL API to accomplish this?

  meterDataStream
   .groupByKey()
   .count(
     TimeWindows
       .of(TimeUnit.HOURS.toMillis(1))
       .until(TimeUnit.HOURS.toMillis(1)), 
     "counts")
   .foreach((key, value) => {
     val start = epochMillistoDate(key.window().start())
     val end   = epochMillistoDate(key.window().end())
     logger.info(s"$start - $end\t->$value")
   })
1

1 Answers

2
votes

You will need to build your own deduplication operator for this.

meterDateStream
    .transform(/*write your own deduplicator*/)
    .groupByKey()....

The deduplicator (ie, Transformer) must have an attached state store and you might want to check out punctuations. Check out the docs for more details: