1
votes

I have a streams application, with the following Domain:

case class Test (testid: String, testcount: Long, testUser: String)
case class TestAgg (testid: String, aggCount: Long)

I have a testStream which reads from an input topic (testid being the key) into the Test case class. This works just fine.

However, when I try to do windowed aggregation on the testStream I observe strange behavior:

val aggStore = 
    testStream
    .mapValues((testid, test) => TestAgg(test.testid, test.testCount))
    .groupByKey
    .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
    .aggregate[TestAgg](TestAgg("", 0L))(
      (testid: String, n: TestAgg, o: TestAgg) => {
        TestAgg(n.testid, n.aggCount + o.aggCount)
      }
    ) (Materialized.as[String, TestAgg, ByteArrayWindowStore])("Test-agg-store").withKeySerde(Serdes.String()).withValueSerde(TestAgg.TestAggBytesSerDe)


aggStore
  .toStream
  .map((k,v) => (k.key(), v.asJson.toString()))
  .print(sysout)

I see the following output in the console:

[console]: Test_89, {
  "testid" : "Test_89",
  "aggCount" : 60515984
}
[console]: Test_33, {
  "testid" : "Test_33",
  "aggCount" : 45388033
}
[console]: Test_48, {
  "testid" : "Test_48",
  "aggCount" : 15130551
}

However, I see the output printed to the console every 10 seconds. Should it not print every 1 minute? What am I missing here?

If I add, suppress:

aggStore
  .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
  .toStream
  .map((k,v) => (k.key(), v.asJson.toString()))
  .print(sysout)

there is no output at all.

How do I explain this behavior?

EDIT :

I changed the window:

 .windowedBy(TimeWindows.of(Duration.ofMinutes(1L))
  .grace(Duration.ofSeconds(1L)))

after which I observe two things:

  1. Without Suppress: the stream emits every 10 seconds. I have no idea why 10 seconds
  2. with Suppress:
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()).withName("Test-suppress-store"))

the stream does emit every 1 minute. (So adding the grace must have helped)

But my initial question, relating to observation 1, still remains: why 10seconds and why not 1 minute? I am not specifying 10seconds anywhere.

1

1 Answers

0
votes

Do you have the timestamps of the records being processed? Without suppress the windows will emit every time a new record is processed even if the window isn't closed.