I'm trying to delete records with the value null, in the downstream changelog, I know in the statestore they are deleted just by being null (tombstones), but when you do an aggregation on the KTable or Stream, they skip the null and don't delete it. I need to figure a way out to set a flag for deletion in the aggregation to let Kafka know the record can be deleted. Here is my code:
public void deleteByEntity(String inputTopic, String target, String stateStoreName) {
// Need to set property to true in application.properties
// if ("true".equals(utils.getProperty(ApplicationConfigs.KAFKA_DELETE_BY_ENTITY))) {
Materialized<String, String, KeyValueStore<Bytes, byte[]>> storeName =
Materialized.as(stateStoreName);
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> docStream = streamsBuilder.stream(inputTopic);
KTable<?, ?> dataInTable =
docStream
.groupByKey()
.reduce(
(value1, value2) -> {
// System.out.println("aa");
if (value1.equals(target)) {
// If key equals target topic return null, creates tombstone deletes from
// statestore, sends null record downstream
return null;
}
return value2;
},
storeName);
// System.out.println(dataInTable);
}
Thanks