1
votes

I have a kafka consumer processing a message topic name "ABC" that has config:

{key.deserializer=class org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=class org.apache.kafka.common.serialization.StringDeserializer, max.poll.records=250, group.instance.id=ABC, group.id=ABC, bootstrap.servers=localhost:9093, auto.commit.interval.ms=50, security.protocol=SSL, enable.auto.commit=true, ssl.truststore.location=, ssl.truststore.password=, ssl.endpoint.identification.algorithm=, client.id=ABC}

the consumer code is:

            Runnable processMessageConsumer = () -> {

                try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig)) {

                    log.debug("consumer created {} about to subscribe to topic ABC", consumer);
                    consumer.subscribe(Collections.singletonList("ABC"));
                    log.debug("subscribed to topic ABC {}", consumer);

                    while(!Thread.currentThread().isInterrupted()) {
                        try {
                            log.debug("about to call consumer.poll");
                            final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));

                            log.debug("return from consumer.poll got records of {}", records);

                            if (records.isEmpty()) {
                                log.warn("Message receive queue is silent (no message received over kafka).");
                            } else {
                                log.debug("about to call processReceivedMessage");
                                processReceivedMessage(records);
                                consumer.commitAsync();
                            }
                        } catch (Exception ex) {
                            log.error("Failed to process received message", ex);
                        }

                    }
            };

maven pom kafka dependencies:

</properties>
 <kafka.version>2.3.0</kafka.version>
...
</properties> 
...
<dependencies>
       <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>

</dependencies>

The code has been working fine until recently but is now giving this error:

- DEBUG [ MSG-Consumer-0] n.r.r.s.i.KafkaService$$EnhancerBySpringCGLIB$$30ee2b7b                         :367  about to call consumer.poll
- ERROR [ MSG-Consumer-0] o.a.k.c.c.internals.AbstractCoordinator                                          :569  [Consumer clientId=ABC, groupId=ABC] Attempt to join group failed due to fatal error: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
- ERROR [ MSG-Consumer-0] n.r.r.s.i.KafkaService$$EnhancerBySpringCGLIB$$30ee2b7b                          :381  Failed to process received message
-
org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
1
Thanks @Armando this would imply that I have changed the partition assignment strategy but I have not? - user2553120

1 Answers

0
votes

So I have now worked out what the issue was. Basically we have standard kakfa code in our library to do general purpose messaging where each service has its own "inbox" topic where it receives messages from other services. The topic is named after the service in this case ABC is the service. So if XYZ service wants to send a message to ABC service it puts the message on the ABC topic. The consumer thread then works out how to dispatch a specific type of message to the relevant end point (message handling method). And... to make it easy the consumer also sets the group.id to ABC.

This mechanism had been working along quite nicely until recently when some stream consumer code was also added to the library which was using its own specific topic but had the application.id (for the stream config) set to the service name so again in this case would be "ABC". This is what caused the InconsistentGroupProtocolException.

The thing is I believe under the covers the application.id is used as the group.id so we then had different topics but the same group.id. and Kafka does not seem to like this too much! The fix was quite easy I just added the class name of the streams handler to the application.id and that resolved it.

So for different topics use distinct group.id/application.id