I'm having a KStream<String,Event> which should be windowedBy and aggregated results in an out of memory:
java.lang.OutOfMemoryError: Java heap space
The KStream DSL is as follows:
TimeWindows timeWindows = TimeWindows.of(Duration.ofDays(1)).advanceBy(Duration.ofMillis(1));
Initializer<History> historyInitializer = History::new;
Aggregator<String, Event, History> historyAggregator = (key, value, aggregate) -> {
aggregate.key = value.uuid;
aggregate.addHistoryEventWindow(value);
return aggregate;
};
KTable<String, History> historyWindowed = eventStreamRaw
.filter((key, value) -> value != null)
.groupByKey(Grouped.with(Serdes.String(), this.eventSerde))
// segment our messages into 1-day windows
.windowedBy(timeWindows)
.aggregate(historyInitializer, historyAggregator, Named.as("name"), Materialized.with(Serdes.String(), this.historySerde))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.groupBy(
(key, value) -> new KeyValue<String, History>(
value.key + "|+|" + key.window().start() + "|+|" + key.window().end(), value),
Grouped.with(Serdes.String(), this.historySerde))
.aggregate(History::new, (key, value, aggValue) -> value, (key, value, aggValue) -> value,
Materialized.with(Serdes.String(), this.historySerde));
Reading some articles (for example Kafka Streams Window By & RocksDB Tuning) I noticed that I may have to configure the store "Materialized" with a retention of "1 day + 1 Milli".
But trying to add that doesn't work for me:
final Materialized<String, History, WindowStore<Bytes, byte[]>> store = Materialized.<String, History, WindowStore<Bytes, byte[]>>as("eventstore")
.withKeySerde(Serdes.String())
.withValueSerde(this.historySerde)
.withRetention(Duration.ofDays(1).plus(Duration.ofMillis(1)));
KTable<String, History> historyWindowed = eventStreamRaw
...
.aggregate(historyInitializer, historyAggregator, Named.as("name"), store)
The Java compile throw the following error:
The method
aggregate(Initializer<VR>, Aggregator<? super String,? super Event,VR>, Named, Materialized<String,VR,WindowStore<Bytes,byte[]>>)
in the type TimeWindowedKStream<String,Event> is not applicable for the arguments
(Initializer<History>, Aggregator<String,Event,History>, Named, Materialized<String,History,WindowStore<Bytes,byte[]>>)
To be honest, I don't get it. The parameters are correct; the VR type is 'History'.
So, do you know what I'm missing?
The idea of this windowedBy KTable is to have a state which holds all events for one "thing" for one day. Let's say a new alert is produced I want to attach all events of a "thing" for one day to the alert. I would then do a leftJoin from the KStream Alert to the KTable History. Would that the best way to add historical data to a Kafka event? Is there a way to just "look up" the last x days of the KStream Events? I've checked the KStream Alert-KStream Event leftJoin but that would produce an output for every new KStream Event. So, that would be from my point not practicable.
Many thanks for your help. I hope it's just a simple fix one. Highly appreciate!