2
votes

I am using librdkafka C API consumer (specifically using rd_kafka_consumer_poll to read and I did call rd_kafka_poll_set_consumer before this)

Problem I see is that in my google test I do following

  1. write 3 messages to kafka

  2. init/start kafka consumer (rd_kafka_consumer_poll)

  3. in rebalance_cb I set each partition offset to RD_KAFKA_OFFSET_STORED and assign them to handle

  4. At this point I believe it should read 3 messages but it reads only last message but surprisingly offset for each partition is already updated!

Am I missing something here using Kafka consumer?

And one more question is I initially thought stored offset is in kafka broker and there is unique offset for topic + consumer group id + partition combination.

So I thought different consumer groups reading same topic should have different offset.

However, it doesn't look like the case. I am always reading from same offset when used different consumer groups.

I am suspecting this may be related to offset commit but not sure where to tackle this.

Any insight?

1

1 Answers

1
votes

Configuration to look at : auto.offset.reset

From Kakfa consumer documentation :

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server

From librdkafka documentation :

Action to take when there is no initial offset in offset store or the desired offset is out of range: 'smallest','earliest' - automatically reset the offset to the smallest offset, 'largest','latest' - automatically reset the offset to the largest offset, 'error' - trigger an error which is retrieved by consuming messages and checking 'message->err'. Type: enum value

Default value is latest.

Furthermore,

#define RD_KAFKA_OFFSET_STORED -1000

So, you're trying to set partition offset to -1000 which is obviously not a valid offset. Apparently, librdkafka reads last message in this case (I didn't check code).