I am using processor api to delete messages from state store. Delete is working successfully, i confirmed by using interactive queries call on state store by kafka key, but it does not reduce the kafka streams file size on local disk under directory tmp/kafka-streams.
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long l) {
processorContext.commit();
}
}); //invoke punctuate every 12 seconds
this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
log.info("Processor initialized");
}
@Override
public void process(String key, GenericRecord value) {
statestore.all().forEachRemaining(keyValue -> {
statestore.delete(keyValue.key);
});
}
kafka streams directory size
2.3M /private/tmp/kafka-streams
3.3M /private/tmp/kafka-streams
Do I need any specific configuration so that it keeps the file size in control? If it doesn't work this way, is it okay to delete kafka-streams directory? I assume it should be safe, since such delete will delete the record from both state store and changelog topic.