The implementation was in Python. Using confluent_kafka.
I have a consumer object to poll message from a kafka topic. The messages are used for further processing by other big objects and I cannot afford to back up the object after each message processing due to the size.
I periodically dump the object and then manually commit the consumer. Below are the sample codes I implemented.
from confluent_kafka import Consumer, KafkaError, TopicPartition
c = Consumer({
'bootstrap.servers': 'myserver',
'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'},
'enable.auto.commit': "false"
})
c.subscribe(['mytopic'])
offsets = {}
for i in range(10):
msg = c.poll()
if msg.error():
continue
par = msg.partition()
off = msg.offset()
offsets[p] = off
c.commit(async=False)
print(offsets)
When I ran this code the 2nd time, I expect the message offset, if from the same partition, should be the next one, i.e. +1, from the previous offset as printed.
But the offsets advanced a lot. A few hundreds more.
I also tried to manually assign the positions as follows:
lst_part = []
for par, off in offsets.items():
lst_part.append(TopicPartition('mytopic', par, off))
c.assign(lst_part)
# then start polling messages
The newly polled messages are not the assigned offset + 1.