2
votes

I have a kafka console consumer in version 1.1.0 that i use to get messages from Kafka. When I use kafka-console-consumer.sh script with option --max-messages it seems like it is commiting wrong offsets.

I've created a topic and a consumer group and read some messages:

/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.23:9092 --describe --group my-consumer-group
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test.offset     1          374             374             0               -               -               -
test.offset     0          0               375             375             -               -               -

Than I read 10 messages like this:

/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9092 --topic test.offset --timeout-ms 1000 --max-messages 10 --consumer.config /kafka_2.11-1.1.0/config/consumer.properties
1 var_1
3 var_3
5 var_5
7 var_7
9 var_9
11 var_11
13 var_13
15 var_15
17 var_17
19 var_19
Processed a total of 10 messages

But now offsets show that it read all the messages in a topic

/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.23:9092 --describe --group my-consumer-group
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'my-consumer-group' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test.offset     1          374             374             0               -               -               -
test.offset     0          375             375             0               -               -               -

And now when I want to read more messages I get an error that there are no more messages in a topic:

/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9092 --topic test.offset --timeout-ms 1000 --max-messages 10 --consumer.config /kafka_2.11-1.1.0/config/consumer.properties
[2020-02-28 08:27:54,782] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
kafka.consumer.ConsumerTimeoutException
        at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:98)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:129)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages

What do I do wrong? Why the offset moved to last message in topic and not just by 10 messages?

1
Did you tried this more than 500 lag? It can be related to max.poll.records config which its default value is 500.H.Ç.T
Yes, You are right! Thank you very much :)Athlestan

1 Answers

0
votes

This is about auto commit feature of Kafka consumer. As mentioned in this link:

The easiest way to commit offsets is to allow the consumer to do it for you. If you configure enable.auto.commit=true, then every five seconds the consumer will commit the largest offset your client received from poll(). The five-second interval is the default and is controlled by setting auto.commit.interval.ms. Just like everything else in the consumer, the automatic commits are driven by the poll loop. Whenever you poll, the consumer checks if it is time to commit, and if it is, it will commit the offsets it returned in the last poll.

So in your case when your consumer poll, it receives messages up to 500 (default value of max.poll.records) and after 5 seconds it commits largest offset that return from last poll (375 in your case) even you specify max-messages as 10.

--max-messages: The maximum number of messages to consume before exiting. If not set, consumption is continual.