1
votes

I'm trying to delete records with the value null, in the downstream changelog, I know in the statestore they are deleted just by being null (tombstones), but when you do an aggregation on the KTable or Stream, they skip the null and don't delete it. I need to figure a way out to set a flag for deletion in the aggregation to let Kafka know the record can be deleted. Here is my code:

   public void deleteByEntity(String inputTopic, String target, String stateStoreName) {

      // Need to set property to true in application.properties
//      if ("true".equals(utils.getProperty(ApplicationConfigs.KAFKA_DELETE_BY_ENTITY))) {
      Materialized<String, String, KeyValueStore<Bytes, byte[]>> storeName =
              Materialized.as(stateStoreName);

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> docStream = streamsBuilder.stream(inputTopic);

    KTable<?, ?> dataInTable =
        docStream
            .groupByKey()
            .reduce(
                (value1, value2) -> {
                  //                  System.out.println("aa");
                  if (value1.equals(target)) {
                    // If key equals target topic return null, creates tombstone deletes from
                    // statestore, sends null record downstream
                    return null;
                  }
                  return value2;
                },
                storeName);
    //    System.out.println(dataInTable);
  }

Thanks

1

1 Answers

1
votes

If you return null from your Reducer it will delete the data from the store and it will send a corresponding output record <key,null>. Hence, no downstream processing should be required.

Note that null keys and null values are only ignored for input record to reduce().