I am trying to join a
- KStream: created from a topic, the topic has JSON value. I re-key the stream using
two attributed from the value. example value (snippet of the json). I created a custom pojo class and use a custom serdes.
{"value":"0","time":1.540753118800291E9,,"deviceIp":"111.111.111.111","deviceName":"KYZ1","indicatorName":"ifHCInOctets"}
keys are mapped as:
map((key, value) -> KeyValue.pair(value.deviceName+value.indicatorName, value))
I do a peek on the KStream and prints both key and the attributes I used. Looks all good.
- KTable: I create a ktable from a topic, I am writing to the topic using a python script and the key for the topic is
KYZ1ifHCInOctets
, the combination of device name and indicator name (from above). I do a toStream and then a peek on the resulting stream. Keys and values all seems fine.
Now when i do a inner join and do a peek or through/to a topic i see the key and values are mismatched. Join doesn't seems to work,
KStream<String, MyPojoClass> joined= datastream.join(table,
(data,table)->data
,Joined.with(Serdes.String(),myCustomSerde,Serdes.String())
);
key = XYZ1s1_TotalDiscards
Value = {"deviceName":"ABC2", "indicatorName":"jnxCosQstatTxedBytes"}
I have the exactly the same thing working through ksql, but wanted to do my own stream app.
(data,table)->""+data
. It's a shot into the dark to be honest... – Matthias J. Sax