I have a Kafka topic that contains data in Json:
{"id": "A", "country": "France"}
{"id": "B", "currency": "£"}
And I want to normalize the content with something like a "reference table":
country ( "France" ) -> "FR"
currency ( "£" ) -> "GBP"
in order to output:
{"id": "A", "country": "FR"}
{"id": "B", "currency": "GBP"}
I think this is a typical use case of using a KTable
to store the reference data. But I am a little bit stuck on the implementation.
Current state
Ingesting the reference data
Dedicated topic created on Kafka : poc-mapping-in
Topic feeded with sample Json Data :
{"mapping":"ccy", "from":"£", "to":"GBP"}
{"mapping":"country", "from":"France", "to":"FR"}
Data ingested in KTable
after rework on the key and value :
KStream<String, String> mappingStream = builder
.stream("poc-mapping-in",consumed)
.map(
(key, value) -> KeyValue.pair(
value.get("mapping")+"#"+value.get("from"),
value.get("to").asText())
);
KGroupedStream<String, String> mappingGroupedStream = mappingStream.groupByKey(
Serialized.with(Serdes.String(),Serdes.String() ));
KTable<String,String> mappingTable = mappingGroupedStream.aggregate(
() -> "", //initializer
(aggKey, newValue, aggValue) -> newValue, // adder
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("ReferenceStore")
.withValueSerde(Serdes.String())
.withKeySerde(Serdes.String())
);
// Testing
mappingTable.toStream().to("poc-mapping-in-content",
Produced.with(Serdes.String(), Serdes.String()));
And in the topic poc-mapping-in-content
, I get these lines:
"currency"#"£" GBP
"country"#"France" FR
That looks like what I expected. The double-quotes are weird, but it does not block me to go further.
And the data is/should be stored in a local store called ReferenceStore
.
Ingesting the business flow
Topic created on Kafka : poc-raw-events
Topic feeded with sample Json Data :
{"id": "A", "country": "France"}
{"id": "B", "currency": "£"}
Data ingested in KStream
:
final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
KStream<String, JsonNode> businessData = builder.stream("poc-raw-events", consumed);
From here I do not know what to do. Technically, I know how to update an attribute in my JsonNode. So I tried to loop on the KStream
with foreach
, this way:
businessData.foreach(new ForeachAction<String, JsonNode>() {
public void apply(String k, JsonNode v) {
System.out.println(k+ " : " +v);
if (v==null) {System.out.println("NULL detected"); return;}
Iterator<Entry<String, JsonNode>> fields = v.fields();
int i=0;
while (fields.hasNext()) {
i++;
Entry<String, JsonNode> next = fields.next();
System.out.println(k+ " field #"+i+" : " +next.getKey() + " -- " + next.getValue());
String key = next.getKey() + "#" + next.getValue());
// ((ObjectNode) v).put(next.getKey(), " WHAT HERE ??? ");
}
}
});
And my idea was to replace the " WHAT HERE ??? "
in the last line with the data present in the reference KTable. But how ???
- I did not find something like a
.findByKey()
on a KTable. - I do not know how to access the
ReferenceStore
local store, because the way to access it is something likemyKafkaStream.store(...)
and at this momentmyKafkaStream
is not started yet, neither it is even built.
Another way I thought about was to use the KStream leftJoin KTable capability. But I read somewhere (I did not bookmark...) that to do that, we should use the same key in both KTables. But in my case, on the Json side, I do not work on the key to join, but on a simple attribute.
How would you implement this?
stream.join(table)
: docs.confluent.io/current/streams/developer-guide/… -- also check out: confluent.io/blog/crossing-streams-joins-apache-kafka – Matthias J. Saxtransform(.... , "ReferenceStore")
. I will try to do it with aleftJoin
which is probably a more elegant way. – Val Bonn