0
votes

I have an installation of two Kafka 2.1.0 brokers on a single Windows host. Default replication factor is set to 2. All other settings are default.

A producer is able to write messages to log even if I turn off one of the brokers. But a consumer stops consuming messages in this case. Even if I restart it, it doesn't get partitions assigned. It just writes to log this warning:

main - org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-1, groupId=sout] Connection to node -2 (/192.168.0.1:19092) could not be established. Broker may not be available.

A consumer:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

public final class PumpToConsoleSimple {

  private static final Duration pollTimeout = Duration.ofSeconds(10);

  public static void main(String[] args) {
    final Properties consumerProperties = new Properties();

    consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "sout");
    consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.1:19092");
    consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);

    try (final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerProperties)) {
      kafkaConsumer.subscribe(Collections.singleton("test"), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
          //do nothing
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
          System.out.println("Partitions were assigned");
          kafkaConsumer.seekToBeginning(partitions);
        }
      });
      while (true) {
        final ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(pollTimeout);
        consumerRecords.forEach(r -> System.out.println(r.value()));
        kafkaConsumer.commitSync();
      }
    }
  }
}

A producer:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.locks.LockSupport;

public final class OnceInASecondProducerSimple {
  public static void main(String[] args) {
    final Properties producerProperties = new Properties();
    producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.1:19092");

    long counter = 0;
    while (true) {
      try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
        producer.send(new ProducerRecord<>("test", "msg" + counter++));
      }
      LockSupport.parkNanos(Duration.ofSeconds(1).getNano());
    }
  }
}

Consumer continues work only after I start the broker again.

What I've missed? How to get high availability for Kafka consumer?

1
Connection to node -2 (/192.168.0.1:19092) .... 19092? Are you mistakenly specifying a wrong port?amethystic
No. There are two brokers on the same machine: the first uses port 9092 and the second uses 19092.Dmitrii Apanasevich

1 Answers

1
votes

Check offset topic __consumer_offsets state by kafka-topics script. The failed broker must be the coordinator for your group and __consumer_offsets's replication factor might be 1, thus the consumer cannot find the coordinator. Even you restart the consumer, it still failed to look up the coordinator.

In your case, you could bump up the replication factor of __consumer_offsets and retry to see if it works as expected.