1
votes

Kafka v2.4 Consumer Configurations:-

kafka.consumer.auto.offset.reset=earliest
kafka.consumer.auto.commit=false

Kafka consumer container config:-

@Bean
public ConcurrentKafkaListenerContainerFactory<String, PayoutDto> kafkaPayoutStatusPoolListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, PayoutDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(kafkaConsumerFactoryForPayoutEvent());
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setMissingTopicsFatal(false);
    return factory;
}

Kafka consumer:-

@KafkaListener(id = "regularPayoutEventConsumer", topics = "${kafka.regular.payout.consumer.queuename}", containerFactory = "kafkaPayoutStatusPoolListenerContainerFactory", groupId = "${kafka.regular.payout.consumer.groupId}")
public void listen(ConsumerRecord<String, PayoutDto> consumerRecord, Acknowledgment ack) {
    StopWatch watch = new StopWatch();
    watch.start();
    String key = null;
    Long offset = null;
    try {
        PayoutDto payoutDto = consumerRecord.value();
        key = consumerRecord.key();
        offset = consumerRecord.offset();
        cpAccountsService.processPayoutEvent(payoutDto);
        ack.acknowledge();
    } catch (Exception e) {
        log.error("Exception occured in RegularPayoutEventConsumer due to following issue {}", e);
    } finally {
        watch.stop();
        log.debug("tolal time taken by consumer for requestID:" + key + " on offset:" + offset + " is:"
                + watch.getTotalTimeMillis());
    }

}

Success Scenario:-

  1. consumer failed to acknowledge on an exception which creates a lag, lets say last committed offset is 30 and now lag is 4.
  2. on next auto poll cycle after poll interval, consumer continues to consumes where lag starts from 30 and ends at 33 normally and lag is now 0.

Failed Scenario:-

  1. same as step 1 from success scenario.
  2. now before consumer poll interval, producer pushed new message.
  3. now on new producer event, consumer pulls data and jumps directly to offset record 33 and skipped 30,31,32 and clearing the lag to 0.

App startup logs of kafka:-

        2021-04-14 10:38:06.132  INFO 10286 --- [  restartedMain] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-RegularPayoutEventGroupId-3, groupId=RegularPayoutEventGroupId] Subscribed to topic(s): InstantPayoutTransactionsEv
    2021-04-14 10:38:06.132  INFO 10286 --- [  restartedMain] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
    2021-04-14 10:38:06.133  INFO 10286 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-PayoutEventGroupId-4
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = PayoutEventGroupId
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 30000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class com.cms.cpa.config.KafkaPayoutDeserializer

    2021-04-14 10:38:06.137  INFO 10286 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
    2021-04-14 10:38:06.137  INFO 10286 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
1
Sounds implausible - you need to show more code and configuration; logs etc. Offsets committed can be logged at a higher level using the commitLogLevel container property.Gary Russell
@GaryRussell, i have updated the required details.Rohan Dodeja
@GaryRussell, I have created a git repo for the same to better explain the problem, please do refer, github.com/rohandodeja/kafka-test-appRohan Dodeja
I've tried with your app and it works as expected for me. The only way I can get a lag of 4 is to send 4 bad records with data=1. Then, when I send a new good record, it is received immediately, as expected. The bad records will not be redelivered with your configuration. Perhaps I am misunderstanding your expectations.Gary Russell
@GaryRussell, that throws Exception is just example for getting the exception and generating lag just like my real application behaves, the main issue is that when you produce the good record then at that time consumer will directly consume the last good record which you pushed, but the behavior I'm expecting will be like, it should first process the old records then process the new one's, but as you see that you only getting the latest record and lag was directly nullified.Rohan Dodeja

1 Answers

1
votes

Kafka maintains 2 values for a consumer/partition - the committed offset (where the consumer will start if restarted) and position - which record will be returned on the next poll.

Not acknowledging a record will not cause the position to be repositioned.

It is working as-designed; if you want to re-process a failed record, you need to use acknowledgment.nack() with an optional sleep time, or throw an exception and configure a SeekToCurrentErrorHandler.

In those cases, the container will reposition the partitions so that the failed record is redelivered. With the error handler you can "recover" the failed record after the retries are exhausted. When using nack(), the listener has to keep track of the attempts.

See https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets

and https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling