0
votes

I am using processor api to delete messages from state store. Delete is working successfully, i confirmed by using interactive queries call on state store by kafka key, but it does not reduce the kafka streams file size on local disk under directory tmp/kafka-streams.

@Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
            @Override
            public void punctuate(long l) {
                processorContext.commit();
            }
        }); //invoke punctuate every 12 seconds
        this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
        log.info("Processor initialized");
    }

    @Override
    public void process(String key, GenericRecord value) {
        statestore.all().forEachRemaining(keyValue -> {
            statestore.delete(keyValue.key);
        });
    }

kafka streams directory size

2.3M    /private/tmp/kafka-streams
3.3M    /private/tmp/kafka-streams

Do I need any specific configuration so that it keeps the file size in control? If it doesn't work this way, is it okay to delete kafka-streams directory? I assume it should be safe, since such delete will delete the record from both state store and changelog topic.

1

1 Answers

2
votes

RocksDB does file compaction in the background. Hence, if you need a more aggressive compaction you should pass in a custom RocksDBConfigSetter via Streams config parameter rocksdb.config.setter. For more details about RockDB, check out the RocksDB documentation.

https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter

However, I would not recommend to change RocksDB configs as long as there is no real issue -- you can do more harm than good. Seems you store size is quite small, thus, I don't see a real problem atm.

Btw: If you go to production, you should change the state.dir config to an appropriate directory where even after restarting of a machine the state will not be lost. If you put state into the default /tmp location, state is most likely gone after restarting of the machine and an expensive recovery from the changelog topics would be triggered.