I'm learning Kafka Streams and try to achieve the following:
Created 2 Kafka topics(say topic1, topic2) with null as key and JSONString as value. Data from topic1(no duplicates) have multiple matching entries in topic2. I.e. topic1 has some mainstream data to generate new multiple data-stream when joined with topic2.
Example:
topic1={"name": "abc", "age":2}, {"name": "xyz", "age":3} and so on.
topic2={"name": "abc", "address"="xxxxxx"}, {"name": "abc", "address"="yyyyyy"}, {"name": "xyz", "address"="jjjjjj"}, {"name": "xyz", "address"="xxxkkkkk"}
Expected Output: {"name": "abc", "age":2, "address"="xxxxxx"}, {"name": "abc", "age":2, "address"="yyyyyy"}, {"name": "xyz", "age":3, "address"="jjjjjj"}, {"name": "xyz", "age":3, "address"="xxxkkkkk"}
Would like to persist/hold data-stream from topic1 for future references, while data-stream from topic2 is just used to achieve the above said use-case and doesn't require any persistence/holding back.
I have few questions: 1) Should hold/store topic1 data stream for few days(possible?) so that incoming data stream from topic2 could be joined. Is it possible? 2) What should I use to achieve this, KStream or KTable? 3) Is this called backpressure mechanism?
Does Kafka Stream support this use-case or should I look out for something else? Plese, suggest.
I have tried a piece of code with KStream with 5 min window but looks like I'm not able to hold topic1 data in the stream.
Please help me with the right choice and join. I'm using Kafka from Confluent with Docker instance.
public void run() {
final StreamsBuilder builder = new StreamsBuilder();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
// Hold data from this topic to 30 days
KStream<String, JsonNode> cs = builder.stream("topic1", consumed);
cs.foreach((k,v) -> {
System.out.println( k + " --->" + v);
});
// Data is involved in one time process.
KStream<String, JsonNode> css = builder.stream("topic2", consumed);
css.foreach((k,v) -> {
System.out.println( k + " --->" + v);
});
KStream<String, JsonNode> resultStream = cs.leftJoin(css,
valueJoiner,
JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
Joined.with(
Serdes.String(), /* key */
jsonSerde, /* left value */
jsonSerde) /* right value */
);
resultStream.foreach((k, v) -> {
System.out.println("JOIN STREAM: KEY="+k+ ", VALUE=" + v);
});
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
}