2
votes

I leftjoin a KStream with a KTable, but I don't see any output to the output topic:

  val stringSerde: Serde[String] = Serdes.String()
  val longSerde: Serde[java.lang.Long] = Serdes.Long()
  val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde()

  val builder = new KStreamBuilder()

  val networkImprStream: KStream[Long, GenericRecord] = builder
    .stream(dfpGcsNetworkImprEnhanced)

  // Create a global table for advertisers. The data from this global table
  // will be fully replicated on each instance of this application.
  val advertiserTable: GlobalKTable[java.lang.Long, GenericRecord]= builder.globalTable(advertiserTopicName, "advertiser-store")

  // Join the network impr stream to the advertiser global table. As this is global table
  // we can use a non-key based join with out needing to repartition the input stream
  val networkImprWithAdvertiserNameKStream: KStream[java.lang.Long, GenericRecord] = networkImprStream.leftJoin(advertiserTable,
    (_, networkImpr) => {
      println(networkImpr)
      networkImpr.get("advertiserId").asInstanceOf[java.lang.Long]
    },
    (networkImpr: GenericRecord, adertiserIdToName: GenericRecord) => {
      println(networkImpr)
      networkImpr.put("advertiserName", adertiserIdToName.get("name"))
      networkImpr
    }
  )

  networkImprWithAdvertiserNameKStream.to(networkImprProcessed)

  val streams = new KafkaStreams(builder, streamsConfiguration)
  streams.cleanUp()
  streams.start()
  // usually the stream application would be running forever,
  // in this example we just let it run for some time and stop since the input data is finite.
  Thread.sleep(15000L)

If I bypass the join and directly output the input topic to the output, I see messages arriving. I've already changed the join to a left join, added some printlns to see when the key is extracted (nothing is printed on the console though). Also I use the kafka streams reset tool every time, so starting from the beginning. I am running out of ideas here. Also I've added some test access to the store and it works and contains keys from the stream (although this should not prohibit any output because of the left join).

1

1 Answers

3
votes

In my source stream the key is null. Although I am not using this key to join the table this key must not be null. So creating an intermediate stream with a dummy key it works. So even I have a global KTable here the restrictions for the keys for the stream messages also apply here: http://docs.confluent.io/current/streams/developer-guide.html#kstream-ktable-join

Input records for the stream with a null key or a null value are ignored and do not trigger the join.