2
votes

Is there any configuration to enable automatic group coordinator recovery after a crash?

I have a testing topology with 3 brokers and once the Group Coordinator is shutdown, the topic partitions (2 partitions with rf=2) gets correctly re balanced, the producer is not affected, but the consumer group stop receiving messages. If I choose any other broker everything works as expected.

Using the JAVA API Kafka Clients 0.10.2.1 for producer and client

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.10.2.1</version>
</dependency>

Monitoring the console output of each broker that remains running, I don't find any reference of a new GroupCoordinator assignment. All consumers resume receiving messages as soon I start the original group coordinator broker. The broker elected as coordinator is always the broker.id=0, no matter the startup order.

Client Config:

private static Consumer<String, String> createFixMessageConsumer(int id) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6100");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, MYCONSUMERGROUP);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, id + "");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());     
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return new KafkaConsumer<>(props, new StringDeserializer(), new FixMessageDeserializer());
    }

Consumer Worker snippet:

    @Override
    public void run() {
        try {
            consumer.subscribe(topics);

            while (true) {
                ConsumerRecords<String, FixMessage> records = consumer.poll(2000);
                FixMessage message = null;
                for (ConsumerRecord<String, FixMessage> record : records) {
                    message = record.value();
                    message.setConsumerId(id);
                    message.setKafkaPartition(record.partition());
                    message.setPartitionOffset(BigInteger.valueOf(record.offset()));
                    Map<String, Object> data = new HashMap<>();
                    data.put("partition", record.partition());
                    data.put("offset", record.offset());
                    if(message.getIdfixMessage() == null)
                        createFixMessage(message, data);
                    data.put("value", message.getIdfixMessage());
                    System.out.println(this.id + ": " + data);
                }
            }
      } catch (WakeupException e) {
        // ignore for shutdown 
      } catch(Exception e) {
          System.out.println(e.toString());
      } finally {
        consumer.close();
      }
    }
2
You could check the replication factor of topic __consumer_offsets to see if it's 1amethystic
You are right @amethystic! Once I increase the RF for the __consumer_offsets topic the Group Coordinator Failure recovery happens as expected. Please submit an answer.Yamada

2 Answers

3
votes

Ensure the replication factor of topic __consumer_offsets is greater than 1 in your case. Before 0.11.0.0, broker-side parameter default.replication.factor will not be enforced, so it's very likely the rf of this internal topic is less than default.replication.factor you set.

1
votes

I was having the same issue with Kafka 2.11-1.0.0. That is , while consumption , if the broker where the consumer group coordinator resides shuts down, new coordinator was not getting discovered. As a result, message consumption was totally halted, inspite of producer being able to continually produce to the newly elected leader (Newly elected leader was in picture, as one of the partitions was falling on the shutdown broker, but it got auto-reassigned to one of the ISRs). After updating the replication factor of internal topic __consumer_offsets to 3 (I have a cluster of 3 brokers), automatic failover of consumer group coordinator started happening. All messages that were successfully produced got consumed after auto discovery of the new consumer group coordinator. To increase RF for internal topic __consumer_offsets , refer to : http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor