0
votes

I'm having a KStream<String,Event> which should be windowedBy and aggregated results in an out of memory:

java.lang.OutOfMemoryError: Java heap space

The KStream DSL is as follows:

TimeWindows timeWindows = TimeWindows.of(Duration.ofDays(1)).advanceBy(Duration.ofMillis(1));
Initializer<History> historyInitializer = History::new;
        Aggregator<String, Event, History> historyAggregator = (key, value, aggregate) -> {
            aggregate.key = value.uuid;
            aggregate.addHistoryEventWindow(value);
            return aggregate;
        };

KTable<String, History> historyWindowed = eventStreamRaw
.filter((key, value) -> value != null)
    .groupByKey(Grouped.with(Serdes.String(), this.eventSerde))
    // segment our messages into 1-day windows
    .windowedBy(timeWindows)
    .aggregate(historyInitializer, historyAggregator, Named.as("name"), Materialized.with(Serdes.String(), this.historySerde))
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
    .groupBy(
            (key, value) -> new KeyValue<String, History>(
                    value.key + "|+|" + key.window().start() + "|+|" + key.window().end(), value),
            Grouped.with(Serdes.String(), this.historySerde))
    .aggregate(History::new, (key, value, aggValue) -> value, (key, value, aggValue) -> value,
            Materialized.with(Serdes.String(), this.historySerde));

Reading some articles (for example Kafka Streams Window By & RocksDB Tuning) I noticed that I may have to configure the store "Materialized" with a retention of "1 day + 1 Milli".

But trying to add that doesn't work for me:

final Materialized<String, History, WindowStore<Bytes, byte[]>> store = Materialized.<String, History, WindowStore<Bytes, byte[]>>as("eventstore")
        .withKeySerde(Serdes.String())
        .withValueSerde(this.historySerde)
        .withRetention(Duration.ofDays(1).plus(Duration.ofMillis(1)));

KTable<String, History> historyWindowed = eventStreamRaw
    ...
    .aggregate(historyInitializer, historyAggregator, Named.as("name"), store)

The Java compile throw the following error:

The method 
aggregate(Initializer<VR>, Aggregator<? super String,? super Event,VR>, Named, Materialized<String,VR,WindowStore<Bytes,byte[]>>) 
in the type TimeWindowedKStream<String,Event> is not applicable for the arguments 
(Initializer<History>, Aggregator<String,Event,History>, Named, Materialized<String,History,WindowStore<Bytes,byte[]>>)

To be honest, I don't get it. The parameters are correct; the VR type is 'History'.

So, do you know what I'm missing?

The idea of this windowedBy KTable is to have a state which holds all events for one "thing" for one day. Let's say a new alert is produced I want to attach all events of a "thing" for one day to the alert. I would then do a leftJoin from the KStream Alert to the KTable History. Would that the best way to add historical data to a Kafka event? Is there a way to just "look up" the last x days of the KStream Events? I've checked the KStream Alert-KStream Event leftJoin but that would produce an output for every new KStream Event. So, that would be from my point not practicable.

Many thanks for your help. I hope it's just a simple fix one. Highly appreciate!

1

1 Answers

0
votes

looking at the following post Kafka Streams App - count and sum aggregate I've imported the wrong "Byte"-class. So, be sure to import the following class "org.apache.kafka.common.utils.Bytes".

But, maybe you have a better idea to enrich a Kafka message from one stream with historical data from another stream related by a (foreign) key.

Thanks guys.