8
votes

I'm facing an issue related to Kafka.

I'm having my current service (Producer) that sends the message to a Kafka topic (events). The service is using kafka_2.12 v1.0.0, written in Java.

I'm trying to integrate it with the sample project of spark-streaming as a Consumer service (here using kafka_2.11 v0.10.0, written in Scala)

The message is sent successfully from Producer to the Kafka topic. However, I always receive the error stack below:

Exception in thread "main" org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members.
    at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571)    at com.jj.streaming.ItemApp$.delayedEndpoint$com$jj$streaming$ItemApp$1(ItemApp.scala:72)
    at com.jj.streaming.ItemApp$delayedInit$body.apply(ItemApp.scala:12)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)     at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)   at scala.App$class.main(App.scala:76)
    at com.jj.streaming.ItemApp$.main(ItemApp.scala:12)
    at com.jj.streaming.ItemApp.main(ItemApp.scala)

I don't know the root cause. How can I fix this?

2
Tin Nguyen, did you find the solution for this? i'm also facing the same issue - Satish
hi @Satish, no I didn't. - Tin Nguyen

2 Answers

4
votes

This happens in my configuration when I attempt to add a consumer to the cluster that is using a different partition assignment strategy from the previous ones.

For example:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RandomAccessAssignor

mixed with or defaulted to:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor

0
votes

As pointed out by @john Cairns and @Iraj Hedyati, check assignment strategy assigned to the consumer group. Different clients create consumer groups with different default strategies. for example

when I used kafka command-line client ( java ), it has created a consumer group using the 'range' strategy.

/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group aeprocessor5 --state

GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
aeprocessor5              172.16.1.11:9092 (1003)   range                Stable          1

whereas when I created consumer group using go client using sarama library, it's using 'round-robin strategy.

/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group aeprocessor5 --state

GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
aeprocessor5              172.16.1.11:9092 (1003)   roundrobin           Stable          1

so, if the group already exists and has a different strategy then InconsistentGroupProtocolException is reported.