2
votes

I'm using kafka 0.8.2, I want to use the fetch and commit offset API, as the document said:

The offsets for a given consumer group are maintained by a specific broker called the offset coordinator. i.e., a consumer needs to issue its offset commit and fetch requests to this specific broker. It can discover the current offset coordinator by issuing a consumer metadata request

So I send a ConsumerMetadataRequest, instead of getting the correct response, I'm always get ConsumerCoordinatorNotAvailableCode

The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created.

If I use the kafka 0.8.2beta version, which doesn't have the issue.

Also I use the go client sarama, I have create the __consumer_offsets topic before fetch metadata. And here is my config:

broker.id=1
port=9091
host.name=192.168.33.10
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=data/9091
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
2

2 Answers

1
votes

I've just found a similar problem. The issue is that the consumer has not issued a ConsumerMetadataRequest for the consumer group.

What I've found is that you have to send a ConsumerMetadataRequest and retry with back off if you receive a ConsumerCoordinatorNotAvailableCode returned as an error.

0
votes

I've just found, it's something occurred by my Scale version, when I use Scale's version of 2.9.1, it succeed, but failed in 2.10. I'm not familiar with Scala, so I'm not deep in it.