1
votes

The implementation was in Python. Using confluent_kafka.

I have a consumer object to poll message from a kafka topic. The messages are used for further processing by other big objects and I cannot afford to back up the object after each message processing due to the size.

I periodically dump the object and then manually commit the consumer. Below are the sample codes I implemented.

from confluent_kafka import Consumer, KafkaError, TopicPartition

c = Consumer({
    'bootstrap.servers': 'myserver',
    'group.id': 'mygroup',
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'enable.auto.commit': "false"
})
c.subscribe(['mytopic'])

offsets = {} 

for i in range(10):
    msg = c.poll()

    if msg.error():
        continue

    par = msg.partition()
    off = msg.offset() 
    offsets[p] = off 

c.commit(async=False) 

print(offsets) 

When I ran this code the 2nd time, I expect the message offset, if from the same partition, should be the next one, i.e. +1, from the previous offset as printed.

But the offsets advanced a lot. A few hundreds more.

I also tried to manually assign the positions as follows:

lst_part = []

for par, off in offsets.items():
    lst_part.append(TopicPartition('mytopic', par, off))

c.assign(lst_part)

# then start polling messages

The newly polled messages are not the assigned offset + 1.

1

1 Answers

1
votes

c.commit(async=False) will commit all consumed partitions for which a message has been returned from the client to the application by the poll() call.

If you want more fine grained control over which offsets to commit you can either pass an explicit [TopicPartition(..)] list to commit() (make sure to commit last_message_offset+1) or disable auto.offset.store and explicitly call store_offsets() for the messages/offsets you wish to store for a future commit() call.

Do note that store_offsets() is only available on master and is not yet available in a released version of the confluent-kafka-python client, but soon will be.