Who can help me with Kafka Consumer what in sometime stop receive messages from topic.
I use Spring Boot 2.0.2 Finchley.RC2 and Spring Kafka 2.1.7.RELEASE. Kafka Version 1.0.1
Kafka version : 1.0.1
Kafka commitId : c0518aa65f25317e
My Consumer Config:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
eventsKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("events"));
factory.setConcurrency(1);
return factory;
}
private ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
return new DefaultKafkaConsumerFactory<>(props);
}
}
Kafka Listener:
@KafkaListener(topics = "${kafka.topic.events}", groupId = "events", clientIdPrefix = "appEvents", containerFactory = "eventsKafkaListenerContainerFactory")
public void receiveEvents(ConsumerRecord<?, ?> consumerRecord) {
Event event = eventService.processEvent(consumerRecord);
}
and I have handle NonResponsiveConsumerEvent
@EventListener
public void nonResponsiveEventHandler(NonResponsiveConsumerEvent event) {
log.warn("Consumer has become non-responsive, no poll for {} milliseconds. Listener {}, TopicPartitions {}, Consumer: {}",
event.getTimeSinceLastPoll(), event.getListenerId(), event.getTopicPartitions(),
event.getConsumer().toString());
}
Happened what in recently created topic works fine but sometime later (4-5 hours) we stop receive messages and NonResponsiveConsumerEvent is emitted
Consumer has become non-responsive, no poll for 27531 milliseconds. Listener org.springframework.kafka.KafkaListenerEndpointContainer#0-0, TopicPartitions [events.messages-5, events.messages-0, events.messages-4, events.messages-3, events.messages-2, events.messages-1]
Full consumer config:
ConsumerConfig values:
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
bootstrap.servers = [server1, server2, server3]
check.crcs = true
client.id = appEvents-0
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mft
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
My microservice deployed in openshift and re deploy app don't help to solve the problem. When app is started after redeploy NonResponsiveConsumerEvent is emitted immediately. Only is fixed deleting the topic and creating again.
I think what something happened in this topic...offset broken or I don't know...
Does anyone know what can happen? Thanks