I'm very new to Kafka Streams and encountered a problem.
I have two tables - one is for long-term data (descriptions) and the other is for live data (live). They have a common id.
And the idea is to store data from descriptions (presumably in KTable, keep latest description for each id) and when new messages appear in live - join with data from descriptions on corresponding id and send it further.
For simplicity let's just make all types String.
So the basic idea was like in every tutorial I've seen:
interface Processor {
@Input("live")
KStream<String, String> input();
@Input("descriptions")
KTable<String, String> input();
@Output("output")
KStream<String, String> output();
}
And then:
@StreamListener
@SendTo("output")
public KStream<String, String> process(
@Input("live") KStream<String, String> live,
@Input("descriptions") KTable<String, String> descriptions) {
// ...
}
The problem is that descriptions topic is not KTable-suitable (null keys, just messages).
So I can't use it as an input and I can't create any new intermediate topics for storing a valid stream out of this table (basically read-only).
I was searching for some sort of in-memory Binding destination, but to no avail.
The way I thought it could be possible is something like creating an intermediate output that just stores KTable in-memory or something and then using this intermediate as an input in live processing. Like:
@StreamListener("descriptions")
@SendTo("intermediate")
public KTable<String, String> process(@Input("descriptions") KStream<String, String> descriptions) {
// ...
}
Hope it's possible with this Binding semantics.
@StreamListener("descriptions") @Output("intermediate") public KTable<String, String> process(KStream<String, String> descriptions) { return descriptions .map((key, value) -> { Pojo p = parseIntoPojo(value); return new KeyValue<>(p.getId(), value); }) .groupByKey() .reduce((v1, v2) -> v2); }
If that's not possible, well, need to do it without the Binding – Eshnelbek