0
votes

I am using kafka 2.10-0.9.0.1 When I delete a topic via command and topic is marked for deletion.

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic T.AB_KAFF

However, as I publish messages to this topic and when I subscribe to this topic again, the topic description says that its lag is -ve of what was current offset (last committed offset)

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group G.AB_KAFF

Last state of topic :

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER G.AB_KAFF, T.AB_KAFF, 0, 5, 5, 0, consumer-2_/127.0.0.1 G.AB_KAFF, T.AB_KAFF, 1, 5, 5, 0, consumer-2_/127.0.0.1

Now I delete the topic. And publish message to this topic

State of topic after subscribing again :

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER G.AB_KAFF, T.AB_KAFF, 0, 5, 1, -4, consumer-3_/127.0.0.1 G.AB_KAFF, T.AB_KAFF, 1, 5, 0, -5, consumer-3_/127.0.0.1

So why does kafka set the lag to -ve number. Isn't that is a potential to cause issues ?

Suppose I subscribe again to deleted topic, so I won't be getting any messages till its -ve lag is 0

Other Information :
- I am running 3 kafka nodes on my local
- I have added property : delete.topic.enable=true
- This topic is created with partitions=2, replication-factor=2

1
Did you try deleting the topic once you've added delete.topic.enable in your broker config file. ?Kulasangar
Yes, this property I have added in my server.properties filenikhil7610
What does -ve lag mean? So did your topic get deleted ?Kulasangar

1 Answers

1
votes

Once you delete a topic it's records stored in the Kafka will be lost. So, a query to the latest offset returns 0.

Consumer instances in the group periodically commits the last read record offset in a special topic __consumer-offset-{0..49}. During the lag computation, it does latestOffset-lastReadRecordOffset so a negative lag shown in the kafka-consumers-groups command after a topic deletion.

[kamal@tcltest1 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list localhost:9092 # Query to get latest offset
test:0:1000
[kamal@tcltest1 bin]$ sh kafka-topics.sh --topic test --zookeeper localhost:2181 --delete
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[kamal@tcltest1 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list localhost:9092 # After deletion
[kamal@tcltest1 bin]$ 

Meta-data stored in the Zookeeper will be removed asynchronously. (Correct me if I'm wrong)

[kamal@tcltest1 bin]$ sh zookeeper-shell.sh localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
cZxid = 0x13c
ctime = Thu Jun 09 09:56:05 IST 2016
mZxid = 0x13c
mtime = Thu Jun 09 09:56:05 IST 2016
pZxid = 0x13c
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0