I have a streams application, with the following Domain:
case class Test (testid: String, testcount: Long, testUser: String)
case class TestAgg (testid: String, aggCount: Long)
I have a testStream
which reads from an input topic (testid
being the key) into the Test
case class. This works just fine.
However, when I try to do windowed aggregation on the testStream
I observe strange behavior:
val aggStore =
testStream
.mapValues((testid, test) => TestAgg(test.testid, test.testCount))
.groupByKey
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.aggregate[TestAgg](TestAgg("", 0L))(
(testid: String, n: TestAgg, o: TestAgg) => {
TestAgg(n.testid, n.aggCount + o.aggCount)
}
) (Materialized.as[String, TestAgg, ByteArrayWindowStore])("Test-agg-store").withKeySerde(Serdes.String()).withValueSerde(TestAgg.TestAggBytesSerDe)
aggStore
.toStream
.map((k,v) => (k.key(), v.asJson.toString()))
.print(sysout)
I see the following output in the console:
[console]: Test_89, {
"testid" : "Test_89",
"aggCount" : 60515984
}
[console]: Test_33, {
"testid" : "Test_33",
"aggCount" : 45388033
}
[console]: Test_48, {
"testid" : "Test_48",
"aggCount" : 15130551
}
However, I see the output printed to the console every 10 seconds. Should it not print every 1 minute? What am I missing here?
If I add, suppress:
aggStore
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream
.map((k,v) => (k.key(), v.asJson.toString()))
.print(sysout)
there is no output at all.
How do I explain this behavior?
EDIT :
I changed the window:
.windowedBy(TimeWindows.of(Duration.ofMinutes(1L))
.grace(Duration.ofSeconds(1L)))
after which I observe two things:
- Without Suppress: the stream emits every 10 seconds. I have no idea why 10 seconds
- with Suppress:
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()).withName("Test-suppress-store"))
the stream does emit every 1 minute. (So adding the grace
must have helped)
But my initial question, relating to observation 1, still remains: why 10seconds and why not 1 minute? I am not specifying 10seconds anywhere.