1
votes

Problem

A consumer with particular group id connects to a broker, listens topic for less than 1 minute and disconnects (according to business logic). While it listens to the topic it can consume some messages. When the same consumer repeats this action it consumes the same messages!

I discovered that Kafka saves offset with interval 1 minute. It means that the consumer has to listen the topic for more than 1 minute. How can I reduce this interval?

I've found such properties:

  • log.flush.offset.checkpoint.interval.ms
  • log.flush.start.offset.checkpoint.interval.ms
  • offset.flush.interval.ms - looks the most appropriate

I try to set them in server.properties file:

log.flush.offset.checkpoint.interval.ms=6000
log.flush.start.offset.checkpoint.interval.ms=6000
offset.flush.interval.ms=6000

Restart Kafka and Zookeeper. But it doesn't help. The consumer still has to listen to the topic for more than 1 minute. What I do wrong?

My environment

  • Kafka and Zookeeper via Confluent.
  • php-rdkafka as client library
  • enable.auto.commit is set to true

I use low level consumer. auto.offset.reset is set to smallest. Code example

<?php
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf = new \RdKafka\Conf();
$conf->set('group.id', 'foo');

$kafkaConsumer = new \RdKafka\Consumer($conf);
$kafkaConsumer->addBrokers('queue.a:9092');
$kafkaConsumer->setLogLevel(LOG_DEBUG);

$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$queue = $kafkaConsumer->newQueue();
$topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
$topic->consumeQueueStart(0, \RD_KAFKA_OFFSET_STORED, $queue);

while (true) {
    $msg = $queue->consume(2000);
    if ($msg !== null) {
        var_dump($msg);
    }
}
1

1 Answers

1
votes

You should try to explicitly commit the offset in your consumer:

Explicitly Committing Offsets in Consumers If you go with the automatic offset commits, you don’t need to worry about explicitly committing offsets. But you do need to think about how you will commit offsets if you decide you need more control over the timing of offset commits—either in order to minimize duplicates or because you are doing event processing outside the main consumer poll loop.

Extract from Kafka definitive guide, page 127. (It's a free Ebook you can download)

It is recommended that you Always commit offsets after events were processed If you do all the processing within the poll loop and don’t maintain state between poll loops (e.g., for aggregation), this should be easy. You can use the auto-commit configuration or commit events at the end of the poll loop.

I have not used the php client myself, but looks like this could be what you need.

Adding to your code example above:

while (true) {
    $msg = $queue->consume(2000);
    if ($msg !== null) {
        var_dump($msg);
        $kafkaConsumer->commit($msg);
    }
}