0
votes

I am currently trying to use a KStream to KTable join to perform enrichment of a Kafka topic. For my proof of concept I currently have a Kafka Stream with about 600,000 records which all have the same key and a KTable created from a topic with 1 record of a key, value pair where the key in the KTable topic matches the key of the 600,000 records in the topic the KStream is created from.

When I use a left join (via the code below), all of the records return NULL on the ValueJoiner.

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe-json-parse-" + System.currentTimeMillis());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xx.xx.xxx:9092");        
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());   
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.WallclockTimestampExtractor");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);


final StreamsBuilder builder = new StreamsBuilder();
// Build a Kafka Stream from the Netcool Input Topic
KStream<String, String> source = builder.stream("output-100k");


// Join the KStream to the KTable
KStream<String, String> enriched_output = source
    .leftJoin(netcool_enrichment, (orig_msg, description) -> {
        String new_msg = jsonEnricher(orig_msg, description);
        if (description != null) {
            System.out.println("\n[DEBUG] Enriched Input Orig: " + orig_msg);
            System.out.println("[DEBUG] Enriched Input Desc: " + description);                
            System.out.println("[DEBUG] Enriched Output: " + new_msg);
        }
        return new_msg;
        });

Here is a sample output record (using a forEach loop) from the source KStream:

[KSTREAM] Key: ismlogs
[KSTREAM] Value: {"severity":"debug","ingested_timestamp":"2018-07-18T19:32:47.227Z","@timestamp":"2018-06-28T23:36:31.000Z","offset":482,"@metadata":{"beat":"filebeat","topic":"input-100k","type":"doc","version":"6.2.2"},"beat":{"hostname":"abc.dec.com","name":"abc.dec.com","version":"6.2.2"},"source":"/root/100k-raw.txt","message":"Thu Jun 28 23:36:31 2018 Debug: Checking status of file /ism/profiles/active/test.xml","key":"ismlogs","tags":["ismlogs"]}

I have tried converting the KTable back to a KStream and used a forEach loop over the converted Stream and I verify the records are actually there in the KTable.

KTable<String, String> enrichment = builder.table("enrichment");
KStream<String, String> ktable_debug = enrichment.toStream();
ktable_debug.foreach(new ForeachAction<String, String>() {
    public void apply(String key, String value) {
        System.out.println("[KTABLE] Key: " + key);
        System.out.println("[KTABLE] Value: " + value);
    }
 });

The code above outputs:

[KTABLE] Key: "ismlogs"
[KTABLE] Value: "ISM Logs"
1
Hi. Can you share the code that loads the KTable? And what happens if you put a System.out.println in a "else" counterpart of your "if (description != null)"? I think this out won't be printed either. -> Have a look at the keys for your KTABLE trace and for your KSTREAM trace. You will see that the key in the table is double-quoted (in Java your key is "\"ismlogs\"" ), where it is a "raw string" in your KStream. I had the same behavior when building my key as a String extracted from a Json payload.Val Bonn
You nailed it - I thought the key-value pair via the Console Producer needed to be quoted, but that was my issue the entire time. Thank you!mattdonders

1 Answers

3
votes

According to your console messages, the keys are different, and therefore they won't join :

[KSTREAM] Key: ismlogs
[KTABLE] Key: "ismlogs"

In the case of the KTable, the key is actually "ismlogs" with the double-quotes.