0
votes

I am trying to join a

  • KStream: created from a topic, the topic has JSON value. I re-key the stream using two attributed from the value. example value (snippet of the json). I created a custom pojo class and use a custom serdes.{"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}

keys are mapped as:

map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))

I do a peek on the KStream and prints both key and the attributes I used. Looks all good.

  • KTable: I create a ktable from a topic, I am writing to the topic using a python script and the key for the topic is KYZ1ifHCInOctets, the combination of device name and indicator name (from above). I do a toStream and then a peek on the resulting stream. Keys and values all seems fine.

Now when i do a inner join and do a peek or through/to a topic i see the key and values are mismatched. Join doesn't seems to work,

  KStream<String, MyPojoClass> joined= datastream.join(table, 
          (data,table)->data
          ,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
          );

key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}

I have the exactly the same thing working through ksql, but wanted to do my own stream app.

1
What do you mean by "see the key and values are mismatched"? Also, can it be that you run into timestamp synchronization issues? Note, that a KTable is build up based on the changelog record timestamps and the progress in building the table is synced with the time progress processing the stream. There is no notion of "first loading the table" and than start processing. (There a improvement for this in upcoming 2.1 release tough)Matthias J. Sax
The keys were derived from the values (combination of device and indicator name). So the data in initial stream is something like Key=XYZ1s1_TotalDiscards, device = XYZ1, indicator =s1_TotalDiscards. If you notice key is concatenation of device and indicator. But after the join the resulting stream have a mismatch and data is something like key = XYZ1s1_totalHCOctets, device = XYZ1, indicator =ifOutDiscardsbp82
@MatthiasJ.Sax Did i explain myself?bp82
Yes. The only idea that comes to my mind, if potential object re-usage. Can you try to copy the output you compute in the join via (data,table)->""+data. It's a shot into the dark to be honest...Matthias J. Sax
@MatthiasJ.Sax I noticed after re-keying the data in streams looks all good but when the stream is written to a topic and re-read the keys and values doesn't match again. The same happen with join. Not sure what this could be, if it matter at all the underlying kafka is version 0.11.0.2 whilst the kafka stream code is written using kafka 2.0.bp82

1 Answers

0
votes

Now it sounds so stupid to what the error was, my PoJo class had few of the attributes as static :-(, resulting in wrong keys.