3
votes

I am having a hard time comprehending how Windowing works in Kafka Streams. The results don't seem to align with what I have read and understood so far.

I have created a KSQL Stream with a backing topic. one of the 'columns' in the KSQL SELECT statement has been designated as the TIMESTAMP for the topic.

CREATE STREAM my_stream WITH (KAFKA_topic='my-stream-topic', VALUE_FORMAT='json', TIMESTAMP='recorded_timestamp') AS select <select list> PARTITION BY PARTITION_KEY;

Records in my-stream-topic are grouped by the key (PARTITION_KEY) and windowed with a hopping window

val dataWindowed: TimeWindowedKStream[String, TopicValue] = builder.stream('my-stream-topic', consumed) 
    .groupByKey(Serialized.`with`(Serdes.String(), valueSerde))
    .windowedBy(TimeWindows.`of`(TimeUnit.MINUTES.toMillis(5)).advanceBy(TimeUnit.MINUTES.toMillis(1)).until(TimeUnit.MINUTES.toMillis(5)))

Records are aggregated via

val dataAgg: KTable[Windowed[String], ValueStats] = dataWindowed
    .aggregate(
      new Initializer[TopicStats] {<code omitted>}},
      new Aggregator[String, TopicValue, TopicStats] {<code omitted>}},
      Materialized.`as`[String, TopicStats, WindowStore[Bytes, Array[Byte]]]("time-windowed-aggregated-stream-store")
        .withValueSerde(new JSONSerde[TopicStats])
    )

  val topicStats: KStream[String, TopicValueStats] = dataAgg
    .toStream()
    .map( <code omitted for brevity>)

I then print to console via

dataAgg.print()
topicStats.print()

The first window that's in the group translates to 7:00 - 7:05

When I examine the records in my-stream-topic via a console consumer I see that there are 2 records that should fall within the above window. However, only 1 of them is picked up by the aggregator.

I thought that the dataAgg windowed KTable would contain 1 record for the grouped key but the aggregate would have used the 2 records to compute the aggregate. The aggregate value printed is incorrect.

What am I missing?

1
Your understanding sounds correct. Why do you show a KSQL statement though? KSQL is unrelated to running your Kafka Streams code. Thus, one suspicion might be, that your application does not use the correct timestamp. You would need to configure a custom timestamp extractor and set it via StreamsConfig. By default, Kafka Streams uses the embedded metadata timestamp. - Matthias J. Sax
I included KSQL to show that I am designating a column as timestamp column. Ksql docs say that that column will be used for window operations. Avoiding creating a timestamp extractor. when I print the topic source stream in Ksql CLI, I see that the rowtime is has the right timestamp value. Can I not rely on ksql generated timestamp? - rams
Ok, I have not explained it very well... It is correct that you write to an output topic name my-stream-topic, however, the with-TIMESTAMP clause is only a metadata operation for created stream my_stream. If you use my_stream in a second query, the second query will use recorded_timestamp as timestamp by default. However, the record written into the my-stream-topic inherit the timestamp for the source topic from your select... part. - Matthias J. Sax
I am not sure what your select... parts computes... if it does not operate on the timestamp, you can set the input timestamp for you select... part as recorded_timestamp -- this was, the recorded_timestamp will be set in the record metadata field when writing to my-stream-topic. I agree that the semantics are not very intuitive. I talked to a colleague who works on KSQL and we will raise an issue to fix it. - Matthias J. Sax

1 Answers

2
votes

KSQL can set record timestamps on write, however you need to specify the timestamp when creating an input stream, not when defining the output stream. Ie, the timestamp specified for the input stream will be used to set the record metadata field on write.

This behavior is rather unintuitive and I opened a ticket for this issue: https://github.com/confluentinc/ksql/issues/1367

Thus, you need to specify the with(TIMESTAMP='recorded_timestamp') clause when creating the input stream for the query you showed in the question. If this is not possible, because your query needs to operate on a different timestamp, you need to specify a second query that copies the data into a new topic.

CREATE STREAM my_stream_with_ts
    WITH (KAFKA_topic='my-stream-topic-with-ts')
AS select * from my_stream PARTITION BY PARTITION_KEY;

As an alternative, you can set a custom timestamp extractor for you Kafka Streams application to extract the timestamp from the payload.