1
votes

Who can help me with Kafka Consumer what in sometime stop receive messages from topic.

I use Spring Boot 2.0.2 Finchley.RC2 and Spring Kafka 2.1.7.RELEASE. Kafka Version 1.0.1

 Kafka version : 1.0.1
 Kafka commitId : c0518aa65f25317e

My Consumer Config:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    eventsKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("events"));
        factory.setConcurrency(1);

        return factory;
    }


    private ConsumerFactory<String, String> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        return new DefaultKafkaConsumerFactory<>(props);
    }

}

Kafka Listener:

 @KafkaListener(topics = "${kafka.topic.events}", groupId = "events", clientIdPrefix = "appEvents", containerFactory = "eventsKafkaListenerContainerFactory")
    public void receiveEvents(ConsumerRecord<?, ?> consumerRecord) {
        Event event = eventService.processEvent(consumerRecord);

    }

and I have handle NonResponsiveConsumerEvent

  @EventListener
    public void nonResponsiveEventHandler(NonResponsiveConsumerEvent event) {
        log.warn("Consumer has become non-responsive, no poll for {} milliseconds. Listener {}, TopicPartitions {}, Consumer: {}",
                event.getTimeSinceLastPoll(), event.getListenerId(), event.getTopicPartitions(),
                event.getConsumer().toString());
    }

Happened what in recently created topic works fine but sometime later (4-5 hours) we stop receive messages and NonResponsiveConsumerEvent is emitted

Consumer has become non-responsive, no poll for 27531 milliseconds. Listener org.springframework.kafka.KafkaListenerEndpointContainer#0-0, TopicPartitions [events.messages-5, events.messages-0, events.messages-4, events.messages-3, events.messages-2, events.messages-1]

Full consumer config:

ConsumerConfig values: 
    auto.commit.interval.ms = 1000
    auto.offset.reset = earliest
    bootstrap.servers = [server1, server2, server3]
    check.crcs = true
    client.id = appEvents-0
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = mft
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

My microservice deployed in openshift and re deploy app don't help to solve the problem. When app is started after redeploy NonResponsiveConsumerEvent is emitted immediately. Only is fixed deleting the topic and creating again.

I think what something happened in this topic...offset broken or I don't know...

Does anyone know what can happen? Thanks

1

1 Answers

0
votes

It's probably some problem communicating with the broker; that event was added (issue here) so applications can be informed that the kafka Consumer is "stuck" in the poll for some reason.

There are links on that issue to an open Kafka JIRA ticket.

You might get some clues looking at the logs (client and server).