4
votes

I am trying to implement manual offset commit for the messages received on kafka. I have set the offset commit to false, but the offset value keeps on increasing.

Not sure what is the reason. Need help resolving the issue.

Below is the code

application.yml

spring:
  application:
    name: kafka-consumer-sample
  resources:
    cache:
      period: 60m

kafka:
      bootstrapServers: localhost:9092
      options:
        enable:
          auto:
            commit: false

KafkaConfig.java

@Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return new DefaultKafkaConsumerFactory<>(config);
    }

 @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

KafkaConsumer.java

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}")
    public void consume(ConsumerRecord<String, String> record) {

        System.out.println("Consumed Kafka Record: " + record);
        record.timestampType();
        System.out.println("record.timestamp() = " + record.timestamp());
        System.out.println("***********************************");
        System.out.println(record.timestamp());
        System.out.println("record.key() = " + record.key());
        System.out.println("Consumed String Message : " + record.value());
    }
}

output is as follows

Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 31, CreateTime = 1573570989565, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 10)
record.timestamp() = 1573570989565
***********************************
1573570989565
record.key() = null
Consumed String Message : 10
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 32, CreateTime = 1573570991535, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 11)
record.timestamp() = 1573570991535
***********************************
1573570991535
record.key() = null
Consumed String Message : 11

Properties are as follows.

auto.commit.interval.ms = 100000000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mygroup
heartbeat.interval.ms = 3000

This is after I restart the consumer. I expected the earlier data to be printed as well.

Is my Understanding correct? Please Note I am restarting my springboot app expecting the messages to start from first. and my kafka server and zookeeper are not terminated.

1
I am not using the poll() method. Also even though the offset will increase automatically for every subsequent poll, it is not happening in server partition. And how to solve this problem :(Tushar Banne
Okay can you show the kafka container config?Deadpool
There is no other configuration apart from what i have already provided :( . Can you specify what you are looking for so that I can check from my sideTushar Banne
@Deadpool I have updated my Question with the code for Creating ConcurrentKafkaListenerContainerFactory.Tushar Banne
try my answer @Tushar BanneDeadpool

1 Answers

4
votes

If theauto acknowledgement is disabled by using this property ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Then you have to set the acknowledgement mode on container level to MANUAL and don't commit the offset because by default it is set to BATCH.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    return factory;
}

Because when auto acknowledgement is disabled container level acknowledgement is set to BATCH

public void setAckMode(ContainerProperties.AckMode ackMode)

Set the ack mode to use when auto ack (in the configuration properties) is false.

  1. RECORD: Ack after each record has been passed to the listener.
  2. BATCH: Ack after each batch of records received from the consumer has been passed to the listener
  3. TIME: Ack after this number of milliseconds; (should be greater than #setPollTimeout(long) pollTimeout.
  4. COUNT: Ack after at least this number of records have been received
  5. MANUAL: Listener is responsible for acking - use a AcknowledgingMessageListener.

Parameters:

ackMode - the ContainerProperties.AckMode; default BATCH.

Committing Offsets

Several options are provided for committing offsets. If the enable.auto.commit consumer property is true, Kafka auto-commits the offsets according to its configuration. If it is false, the containers support several AckMode settings (described in the next list). The default AckMode is BATCH. Starting with version 2.3, the framework sets enable.auto.commit to false unless explicitly set in the configuration. Previously, the Kafka default (true) was used if the property was not set.

And if you want to read from the beginning always you have to set this property auto.offset.reset to earliest

config.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest");

Note : Make sure groupId must be the new one which does not have any offset in kafka