1
votes

I'm very new to Kafka Streams and encountered a problem.

I have two tables - one is for long-term data (descriptions) and the other is for live data (live). They have a common id.

And the idea is to store data from descriptions (presumably in KTable, keep latest description for each id) and when new messages appear in live - join with data from descriptions on corresponding id and send it further.

For simplicity let's just make all types String.

So the basic idea was like in every tutorial I've seen:

interface Processor {

        @Input("live")
        KStream<String, String> input();

        @Input("descriptions")
        KTable<String, String> input();

        @Output("output")
        KStream<String, String> output();
    }

And then:

    @StreamListener
    @SendTo("output")
    public KStream<String, String> process(
            @Input("live") KStream<String, String> live,
            @Input("descriptions") KTable<String, String> descriptions) {
        // ...
    }

The problem is that descriptions topic is not KTable-suitable (null keys, just messages).

So I can't use it as an input and I can't create any new intermediate topics for storing a valid stream out of this table (basically read-only).

I was searching for some sort of in-memory Binding destination, but to no avail.

The way I thought it could be possible is something like creating an intermediate output that just stores KTable in-memory or something and then using this intermediate as an input in live processing. Like:

    @StreamListener("descriptions")
    @SendTo("intermediate")
    public KTable<String, String> process(@Input("descriptions") KStream<String, String> descriptions) {
        // ...
    }

Hope it's possible with this Binding semantics.

1
It's not clear what you mean. How can you "join" an event to a description without the description having a key - which description from the table should be bound to the event? How would it be selected?Gary Russell
@GaryRussell Exactly, so I want to consume this "unkeyed" stream and map it to a KTable with a key and this one is to be used for joining. Basically I need (or so I think) a way to persist an intermediate KTable so I can use as input further down the pipeline.Eshnelbek
But how will you generate the key from a description?Gary Russell
@GaryRussell Something like (sorry for formatting): @StreamListener("descriptions") @Output("intermediate") public KTable<String, String> process(KStream<String, String> descriptions) { return descriptions .map((key, value) -> { Pojo p = parseIntoPojo(value); return new KeyValue<>(p.getId(), value); }) .groupByKey() .reduce((v1, v2) -> v2); } If that's not possible, well, need to do it without the BindingEshnelbek

1 Answers

2
votes

I think you can try to introduce an intermediate topic for storing the key/value by introducing an initial processor. Then use that stream as a table for the input in your regular processor. Here are some templates. I am using the new functional model in Spring Cloud Stream to write these processors.

@Bean
public Function<KStream<String, String>, KStream<String, String>> processDescriptions() {

        return descriptions -> 
            descriptions.map((key, value) -> {
                Pojo p = parseIntoPojo(value);
                return new KeyValue<>(p.getId(), value);
            })
            .groupByKey()
            .reduce((v1, v2) -> v2)
            .toStream();
}

@Bean
public BiFunction<KStream<String, String>, KTable<String, String>, KStream<String, String>> realStream() {

    return (live, description) -> {

    }

}       

The first processor receives the description as KStream and then enrich that with the key and then output as KStream. Now that this topic has both key and value, we can use this as a KTable in our next processor. The next processor is a java.util.function.BiFunction which receives two inputs and generate an output. The inputs are KStream and KTable respectively and the output is a KStream.

You can set destinations on them as below:

spring.cloud.stream.function.definition=prorcessDescriptions;realStream

spring.cloud.stream.bindings.processDescriptions-in-0.destinaion=description-topic
spring.cloud.stream.bindings.processDescriptions-out-0.destinaion=description-table-topic

spring.cloud.stream.bindings.realStream-in-0.destinaion=live-topic
spring.cloud.stream.bindings.realStream-in-1.destinaion=description-table-topic
spring.cloud.stream.bindings.realStream-out-0.destinaion=output

You can achieve the same results by using the StreamListener approach as well.

The downside of this approach is that you need to maintain an extra intermediate topic in Kafka, but if you really want it as a KTable and the underlying information is non-keyed, I don't think there are too many options here.

If you don't need the descriptions as a top-level KTable, you might be able to store this somehow in a state store and later query that store all within a single processor. I haven't tried that out, so you need to play around with that idea. Basically, you get two streams, live and descriptions

(live, descriptions) -> Reduce key/value for descriptions and keep that in a state store. 
Then, do the processing on live by joining with what is in the state store. 

Kafka Streams allows various ways to accomplish things like that. Check their reference docs for more info.

Hope this helps.