we have a need to create compact topics, that need to get compacted after certain size (segment.bytes), but MOST IMPORTANTLY after certain time (segment.ms) has passed (even if segment.bytes has not reached) at the topic level config.
Now, we have seen the segment.bytes being honored, but segment.ms is not being honored. I have reproduced this issue with Confluent kafka 5.x distribution
https://kafka.apache.org/documentation/#topicconfigs
this is what i read about segment.ms in apache kafka documentation, which makes me believe that our understanding is right - that segment.ms will override the segment.bytes - when it comes kafka doing the compaction on a topic.
segment.ms This configuration controls the period of time after which Kafka will force the log to roll even if the segment file isn't full to ensure that retention can delete or compact old data.
I am sending the data with key rotating between 0-20 values, and the string 'Spring Kafka Producer and Consumer Example' and i append the key value to this string.
this is the producer code
@Override
public void run(String... strings) throws Exception {
String data = "Spring Kafka Producer and Consumer Example";
for (int j = 0; j < 30000; j++) {
for (int i = 0; i < 20; i++) {
sender.send(new Integer(i).toString(), data + i);
}
}
}
the code sample is here https://github.com/leofoto/kafka-producer-consumer.git
I have taken the code sample from (and modified it for this testcase) https://memorynotfound.com/spring-kafka-consume-producer-example/
I first created compact-topic, in the broker logs i see the following
Created log for partition my-topic-compact-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager) [2018-09-17 21:28:00,110] INFO [Partition my-topic-compact-0 broker=0] No checkpointed highwatermark is found for partition my-topic-compact-0 (kafka.cluster.Partition)
then when i changed the configuration to make the topic compact
./kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name my-topic-compact --alter --add-config min.cleanable.dirty.ratio=0.01,cleanup.policy=compact,segment.ms=12000,delete.retention.ms=100,segment.bytes=200000 Completed Updating config for entity: topic 'my-topic-compact'.
Broker logs show it again (correctly reporting its a compact-topic now)
[2018-09-17 22:06:25,745] INFO Processing notification(s) to /config/changes (kafka.common.ZkNodeChangeNotificationListener) [2018-09-17 22:06:25,746] INFO Processing override for entityPath: topics/my-topic-compact with config: Map(cleanup.policy -> compact, segment.ms -> 12000, min.cleanable.dirty.ratio -> 0.01, segment.bytes -> 200000, delete.retention.ms -> 100) (kafka.server.DynamicConfigManager)
Also kafka-config --describe command shows it clearly
./kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name my-topic-compact --describe
Configs for topic 'my-topic-compact' are Configs for topic 'my-topic-compact' are segment.bytes=200000,min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,segment.ms=12000,cleanup.policy=compact
when i start the kafka server, i see the following message
<< Starting log cleanup with a period of 300000 ms >> [[ i am sure 300 seconds is the broker config value, the topic level value is 12 seconds in this case ]]
[2018-09-17 22:01:31,215] INFO [Log partition=my-topic-non-compact-0, dir=/tmp/kafka-logs] Completed load of log with 1 segments, log start offset 0 and log end offset 20 in 2 ms (kafka.log.Log) [2018-09-17 22:01:31,218] INFO Logs loading complete in 378 ms. (kafka.log.LogManager) [2018-09-17 22:01:31,224] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2018-09-17 22:01:31,225] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2018-09-17 22:01:31,439] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2018-09-17 22:01:31,463] INFO [SocketServer brokerId=0] Started 1 acceptor threads (kafka.network.SocketServer) [2018-09-17 22:01:31,478] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2018-09-17 22:01:31,478] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2018-09-17 22:01:31,479] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2018-09-17 22:01:31,487] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler) [2018-09-17 22:01:31,537] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient) [2018-09-17 22:01:31,541] INFO Result of znode creation at /brokers/ids/0 is: OK (kafka.zk.KafkaZkClient) [2018-09-17 22:01:31,542] INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(192.168.0.11,9092,ListenerName(PLAINTEXT),PLAINTEXT)) (kafka.zk.KafkaZkClient)
then when i wrote a lot of data i saw segments rolling as well, i see a lot of activity , which pushes the compaction to happen. [ which is fine] i sent over 300k records and the compaction happens and a new consumer that consumes the messages (after compaction has taken place), it sees around 3225 records.
[2018-09-17 22:09:21,602] INFO [Log partition=my-topic-compact-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 185361 in 4 ms. (kafka.log.Log) [2018-09-17 22:09:21,673] INFO [ProducerStateManager partition=my-topic-compact-0] Writing producer snapshot at offset 188897 (kafka.log.ProducerStateManager) [2018-09-17 22:09:21,675] INFO [Log partition=my-topic-compact-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 188897 in 3 ms. (kafka.log.Log) [2018-09-17 22:09:21,755] INFO [ProducerStateManager partition=my-topic-compact-0] Writing producer snapshot at offset 192348 (kafka.log.ProducerStateManager) [2018-09-17 22:09:21,758] INFO [Log partition=my-topic-compact-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 192348 in 3 ms. (kafka.log.Log) [2018-09-17 22:09:21,831] INFO [ProducerStateManager partition=my-topic-compact-0] Writing producer snapshot at offset 195846 (kafka.log.ProducerStateManager) [2018-09-17 22:09:21,834] INFO [Log partition=my-topic-compact-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 195846 in 3 ms. (kafka.log.Log) [2018-09-17 22:09:21,879] INFO [ProducerStateManager partition=my-topic-compact-0] Writing producer snapshot at offset 199461 (kafka.log.ProducerStateManager) [2018-09-17 22:09:21,882] INFO [Log partition=my-topic-compact-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 199461 in 3 ms. (kafka.log.Log) [2018-09-17 22:09:21,909] INFO [ProducerStateManager partition=my-topic-compact-0] Writing producer snapshot at offset 203134 (kafka.log.ProducerStateManager) [2018-09-17 22:09:21,915] INFO [Log partition=my-topic-compact-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 203134 in 7 ms. (kafka.log.Log) [2018-09-17 22:09:21,980] INFO [ProducerStateManager partition=my-topic-compact-0] Writing producer snapshot at offset 206703 (kafka.log.ProducerStateManager) [2018-09-17 22:09:21,985] INFO [Log partition=my-topic-compact-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 206703 in 6 ms. (kafka.log.Log)
now no matter how long a wait (past 12 seconds), the log compaction does not kick off
no matter how much i wait before running the following command (with new consumer group every time)
./kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic-compact --from-beginning --property print.key=true --group new-group16
Every new consumer consumes exactly 3225 messages, If the compaction was to have happened after the topic level segment.ms had passed, it should have gotten it compacted down to just 20 keys and their latest values. But we don't see that behaviour. Am i missing anything.
DELETES NOT WORKING
On top of that, when i send the null payload for the same keys, like this
@Override
public void run(String... strings) throws Exception {
String data = "Spring Kafka Producer and Consumer Example";
for (int j = 0; j < 2; j++) {
for (int i = 0; i < 20; i++) {
sender.send(new Integer(i).toString(), null);
}
}
}
We expect the messages to be eventually deleted by the next compaction cycle. That's not happening for us either after segment.ms time has passed (12 seconds in our case at the topic level config)
./kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name my-topic-compact --describe
Configs for topic 'my-topic-compact' are Configs for topic 'my-topic-compact' are segment.bytes=200000,min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,segment.ms=12000,cleanup.policy=compact