0
votes

I was trying to delete messages from my kafka topic using Java Admin Client API's delete Records method. Following are the steps that i have tried


    1. I pushed 20000 records to my TEST-DELETE topic
    2. Started a console consumer and consumed all the messages
    3. Invoked my java program to delete all those 20k messages
    4. Started another console consumer with a different group id. This consumer is not receiving any of the deleted messages

When I checked the file system, I could still see all those 20k records occupying the disk space. My intention is to delete those records forever from file system too.

My Topic configuration is given below along with server.properties settings


Topic:TEST-DELETE       PartitionCount:4        ReplicationFactor:1     Configs:cleanup.policy=delete
        Topic: TEST-DELETE    Partition: 0      Leader: 0     Replicas: 0       Isr: 0
        Topic: TEST-DELETE    Partition: 1      Leader: 0     Replicas: 0       Isr: 0
        Topic: TEST-DELETE    Partition: 2      Leader: 0     Replicas: 0       Isr: 0
        Topic: TEST-DELETE    Partition: 3      Leader: 0     Replicas: 0       Isr: 0


    log.retention.hours=24
    log.retention.check.interval.ms=60000
    log.cleaner.delete.retention.ms=60000
    file.delete.delay.ms=60000
    delete.retention.ms=60000
    offsets.retention.minutes=5
    offsets.retention.check.interval.ms=60000
    log.cleaner.enable=true
    log.cleanup.policy=compact,delete

My delete code is given below


public void deleteRecords(Map<String, Map<Integer, Long>> allTopicPartions) {

        Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();

        allTopicPartions.entrySet().forEach(topicDetails -> {

            String topicName = topicDetails.getKey();
            Map<Integer, Long> value = topicDetails.getValue();

            value.entrySet().forEach(partitionDetails -> {

                if (partitionDetails.getValue() != 0) {
                    recordsToDelete.put(new TopicPartition(topicName, partitionDetails.getKey()),
                            RecordsToDelete.beforeOffset(partitionDetails.getValue()));
                }
            });
        });

        DeleteRecordsResult deleteRecords = this.client.deleteRecords(recordsToDelete);

        Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = deleteRecords.lowWatermarks();

        lowWatermarks.entrySet().forEach(entry -> {
            try {
                logger.info(entry.getKey().topic() + " " + entry.getKey().partition() + " "
                        + entry.getValue().get().lowWatermark());
            } catch (Exception ex) {

            }
        });

    }

The output of my java program is given below



2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 1 5000
2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 0 5000
2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 3 5000
2019-06-25 16:21:15 INFO  MyKafkaAdminClient:247 - TEST-DELETE 2 5000

My intention is to delete the consumed records from the file system as I am working with limited storage for my kafka broker.

I would like to get some help with my below doubts

  1. I was in the impression that the delete Records will remove the messages from the file system too, but look like I got it wrong!!
  2. How long those deleted records be present in the log directory?
  3. Is there any specific configuration that i need to use in order to remove the records from the files system once the delete Records API is invoked?

Appreciate your help

Thanks

1
I think it marks for deletion when the LogCleaner thread runs. You can't forcibly make the entire cluster delete files immediately - OneCricketeer

1 Answers

0
votes

The recommended approach to handle this is to set retention.ms and related configuration values for the topics you're interested in. That way, you can define how long Kafka will store your data until it deletes it, making sure all your downstream consumers have had the chance to pull down the data before it's deleted from the Kafk cluster.

If, however, you still want to force Kafka to delete based on bytes, there's the log.retention.bytes and retention.bytes configuration values. The first one is a cluster-wide setting, the second one is the topic-specific setting, which by default takes whatever the first one is set to, but you can still override it per topic. The retention.bytes number is enforced per partition, so you should multiply it by the total number of topic partitions.

Be aware, however, that if you have a run-away producer that starts generating a lot of data suddenly, and you have it set to a hard byte limit, you might wipe out entire days worth of data in the cluster, and only be left with the last few minutes of data, maybe before even valid consumers can pull down the data from the cluster. This is why it's much better to set your kafka topics to have time-based retention, and not byte-based.

You can find the configuration properties and their explanation in the official Kafka docs: https://kafka.apache.org/documentation/