I leftjoin a KStream with a KTable, but I don't see any output to the output topic:
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[java.lang.Long] = Serdes.Long()
val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde()
val builder = new KStreamBuilder()
val networkImprStream: KStream[Long, GenericRecord] = builder
.stream(dfpGcsNetworkImprEnhanced)
// Create a global table for advertisers. The data from this global table
// will be fully replicated on each instance of this application.
val advertiserTable: GlobalKTable[java.lang.Long, GenericRecord]= builder.globalTable(advertiserTopicName, "advertiser-store")
// Join the network impr stream to the advertiser global table. As this is global table
// we can use a non-key based join with out needing to repartition the input stream
val networkImprWithAdvertiserNameKStream: KStream[java.lang.Long, GenericRecord] = networkImprStream.leftJoin(advertiserTable,
(_, networkImpr) => {
println(networkImpr)
networkImpr.get("advertiserId").asInstanceOf[java.lang.Long]
},
(networkImpr: GenericRecord, adertiserIdToName: GenericRecord) => {
println(networkImpr)
networkImpr.put("advertiserName", adertiserIdToName.get("name"))
networkImpr
}
)
networkImprWithAdvertiserNameKStream.to(networkImprProcessed)
val streams = new KafkaStreams(builder, streamsConfiguration)
streams.cleanUp()
streams.start()
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(15000L)
If I bypass the join and directly output the input topic to the output, I see messages arriving. I've already changed the join to a left join, added some printlns to see when the key is extracted (nothing is printed on the console though). Also I use the kafka streams reset tool every time, so starting from the beginning. I am running out of ideas here. Also I've added some test access to the store and it works and contains keys from the stream (although this should not prohibit any output because of the left join).