8
votes

I have a Kafka Streams app that connects to our Kafka cluster using the Kafka Streams DSL, like so:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);

// do work

kStreams = new KafkaStreams(builder, config);
kStreams.start();

And another part of my code base that establishes a connection to our cluster using the consumer client directly.

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();

The reason I am doing this is to gather meta data about the consumer group before conditionally kicking off other parts of the app (which includes the Kafka Streams topology). There are probably other ways to do this (e.g. through various hooks or what not), but I am more curious about why the intermixing of these methods will sometimes (intermittently) lead to a InconsistentGroupProtocolException being thrown.

Could someone please shed some light on why this is being thrown? I'm having a difficult time determining what exactly is going on from the source code itself, but I guess the underlying consumers that are constructed by Kafka Streams are specifying a different partitioning protocol than the KafkaConsumer client. Anyways, any help in understanding this exception will be greatly appreciated

1
What do you try to accomplish? - Matthias J. Sax
issues.apache.org/jira/browse/KAFKA-4113 See the comment from 03/Jan/17. I am experiencing this issue and thought this was the easiest solution - foxygen
I see. I guess it might work, if you can make sure that either the consumer OR the application is active. Thus, before you start the one or other, make sure that there are no members in the consumer group -- it, the group is not active. C.f. github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/… (Be aware though, that this call in not part of public API and can get changed at any point without notice.) - Matthias J. Sax

1 Answers

12
votes

You put the answer yourself. Kafka Streams uses a custom partition assigner and a Kafka Streams client only works with other Kafka Streams clients. If you use a KafkaConsumer with the same group ID as your Kafka Streams app, it will fail to fence off KafkaConsumers to join the Kafka Streams consumer group. Obviously, KafkaConsumer cannot "play" with Kafka Streams.