0
votes

I have a Kafka topic that contains data in Json:

{"id": "A", "country": "France"}
{"id": "B", "currency": "£"}

And I want to normalize the content with something like a "reference table":

country ( "France" ) -> "FR"
currency ( "£" ) -> "GBP"

in order to output:

{"id": "A", "country": "FR"}
{"id": "B", "currency": "GBP"}

I think this is a typical use case of using a KTable to store the reference data. But I am a little bit stuck on the implementation.

Current state

Ingesting the reference data

Dedicated topic created on Kafka : poc-mapping-in

Topic feeded with sample Json Data :

{"mapping":"ccy",     "from":"£",      "to":"GBP"}
{"mapping":"country", "from":"France", "to":"FR"}

Data ingested in KTable after rework on the key and value :

         KStream<String, String> mappingStream = builder
                .stream("poc-mapping-in",consumed)
                .map(
                     (key, value) -> KeyValue.pair(
                         value.get("mapping")+"#"+value.get("from"), 
                         value.get("to").asText())
         );

         KGroupedStream<String, String> mappingGroupedStream = mappingStream.groupByKey(
                 Serialized.with(Serdes.String(),Serdes.String() ));


         KTable<String,String> mappingTable = mappingGroupedStream.aggregate(
                () -> "", //initializer 
                (aggKey, newValue, aggValue) -> newValue, // adder 
                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("ReferenceStore")
                    .withValueSerde(Serdes.String())
                    .withKeySerde(Serdes.String())
                );

           // Testing
           mappingTable.toStream().to("poc-mapping-in-content", 
                Produced.with(Serdes.String(), Serdes.String()));

And in the topic poc-mapping-in-content, I get these lines:

"currency"#"£"      GBP
"country"#"France"  FR

That looks like what I expected. The double-quotes are weird, but it does not block me to go further.

And the data is/should be stored in a local store called ReferenceStore.

Ingesting the business flow

Topic created on Kafka : poc-raw-events

Topic feeded with sample Json Data :

{"id": "A", "country": "France"}
{"id": "B", "currency": "£"}

Data ingested in KStream:

  final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
  KStream<String, JsonNode> businessData = builder.stream("poc-raw-events", consumed);

From here I do not know what to do. Technically, I know how to update an attribute in my JsonNode. So I tried to loop on the KStream with foreach, this way:

    businessData.foreach(new ForeachAction<String, JsonNode>()  {
        public void apply(String k, JsonNode v) {
            System.out.println(k+ " : " +v);
                    if (v==null) {System.out.println("NULL detected"); return;}
            Iterator<Entry<String, JsonNode>> fields = v.fields();
            int i=0;
            while (fields.hasNext()) {
                i++;
                Entry<String, JsonNode> next = fields.next();
                System.out.println(k+ " field #"+i+" : " +next.getKey() + " -- " + next.getValue());

                String key = next.getKey() + "#" + next.getValue());
//              ((ObjectNode) v).put(next.getKey(), "  WHAT HERE ??? ");

            }

        }
    });

And my idea was to replace the " WHAT HERE ??? " in the last line with the data present in the reference KTable. But how ???

  • I did not find something like a .findByKey() on a KTable.
  • I do not know how to access the ReferenceStore local store, because the way to access it is something like myKafkaStream.store(...) and at this moment myKafkaStream is not started yet, neither it is even built.

Another way I thought about was to use the KStream leftJoin KTable capability. But I read somewhere (I did not bookmark...) that to do that, we should use the same key in both KTables. But in my case, on the Json side, I do not work on the key to join, but on a simple attribute.

How would you implement this?

2
Thank you Matthias. I solved the issue with a transform(.... , "ReferenceStore"). I will try to do it with a leftJoin which is probably a more elegant way.Val Bonn

2 Answers

4
votes

Since you using reference data, I think what you want to consider using is a GlobalKTable. A GlobalKTable is completely replicated per KafkaStreams instance and was created explicitly for holding reference data for a use case as you have above.

What's unique about KStream-GlobalKTable joins is you can use the KeyValue of the stream to map to the key of the GlobalKTable. So as long as you can pull the attribute out of your JsonNode, you should be able to join with the appropriate record in the GlobalKTable.

0
votes

If referenceKTable has a key that matches data.getAltKey()

streamToMap.selectKey((originalKey, data) -> data.getAltKey()).leftJoin(referenceKTable, valueJoiner)

can accomplish that. The implementation of valueJoiner (or a lambda) must combine the two inputs.