I have a kafka consumer processing a message topic name "ABC" that has config:
{key.deserializer=class org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=class org.apache.kafka.common.serialization.StringDeserializer, max.poll.records=250, group.instance.id=ABC, group.id=ABC, bootstrap.servers=localhost:9093, auto.commit.interval.ms=50, security.protocol=SSL, enable.auto.commit=true, ssl.truststore.location=, ssl.truststore.password=, ssl.endpoint.identification.algorithm=, client.id=ABC}
the consumer code is:
Runnable processMessageConsumer = () -> {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig)) {
log.debug("consumer created {} about to subscribe to topic ABC", consumer);
consumer.subscribe(Collections.singletonList("ABC"));
log.debug("subscribed to topic ABC {}", consumer);
while(!Thread.currentThread().isInterrupted()) {
try {
log.debug("about to call consumer.poll");
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
log.debug("return from consumer.poll got records of {}", records);
if (records.isEmpty()) {
log.warn("Message receive queue is silent (no message received over kafka).");
} else {
log.debug("about to call processReceivedMessage");
processReceivedMessage(records);
consumer.commitAsync();
}
} catch (Exception ex) {
log.error("Failed to process received message", ex);
}
}
};
maven pom kafka dependencies:
</properties>
<kafka.version>2.3.0</kafka.version>
...
</properties>
...
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
The code has been working fine until recently but is now giving this error:
- DEBUG [ MSG-Consumer-0] n.r.r.s.i.KafkaService$$EnhancerBySpringCGLIB$$30ee2b7b :367 about to call consumer.poll
- ERROR [ MSG-Consumer-0] o.a.k.c.c.internals.AbstractCoordinator :569 [Consumer clientId=ABC, groupId=ABC] Attempt to join group failed due to fatal error: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
- ERROR [ MSG-Consumer-0] n.r.r.s.i.KafkaService$$EnhancerBySpringCGLIB$$30ee2b7b :381 Failed to process received message
-
org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.