Problem
A consumer with particular group id connects to a broker, listens topic for less than 1 minute and disconnects (according to business logic). While it listens to the topic it can consume some messages. When the same consumer repeats this action it consumes the same messages!
I discovered that Kafka saves offset with interval 1 minute. It means that the consumer has to listen the topic for more than 1 minute. How can I reduce this interval?
I've found such properties:
log.flush.offset.checkpoint.interval.ms
log.flush.start.offset.checkpoint.interval.ms
offset.flush.interval.ms
- looks the most appropriate
I try to set them in server.properties
file:
log.flush.offset.checkpoint.interval.ms=6000
log.flush.start.offset.checkpoint.interval.ms=6000
offset.flush.interval.ms=6000
Restart Kafka and Zookeeper. But it doesn't help. The consumer still has to listen to the topic for more than 1 minute. What I do wrong?
My environment
- Kafka and Zookeeper via Confluent.
php-rdkafka
as client libraryenable.auto.commit
is set totrue
I use low level consumer. auto.offset.reset
is set to smallest
.
Code example
<?php
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'foo');
$kafkaConsumer = new \RdKafka\Consumer($conf);
$kafkaConsumer->addBrokers('queue.a:9092');
$kafkaConsumer->setLogLevel(LOG_DEBUG);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$queue = $kafkaConsumer->newQueue();
$topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
$topic->consumeQueueStart(0, \RD_KAFKA_OFFSET_STORED, $queue);
while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
}
}