5
votes

We have kafka cluster with 4 brokers and some topics with replica factor 1 and 10 partitions. At one moment 2 of 4 our servers with kafka cluster - fail. So now we have 2 brokers with same topics. When im run command ./kafka_topics.sh --zookeeper localhost:2181 --describe im get this:

Topic:outcoming-notification-error-topic        PartitionCount:10       ReplicationFactor:1     Configs:
    Topic: outcoming-error-topic       Partition: 0    Leader: 2       Replicas: 2     Isr: 2
    Topic: outcoming-error-topic       Partition: 1    Leader: 3       Replicas: 3     Isr: 3
    Topic: outcoming-error-topic       Partition: 2    Leader: 4       Replicas: 4     Isr: 4
    Topic: outcoming-error-topic       Partition: 3    Leader: 1       Replicas: 1     Isr: 1
    Topic: outcoming-error-topic       Partition: 4    Leader: 2       Replicas: 2     Isr: 2
    Topic: outcoming-error-topic       Partition: 5    Leader: 3       Replicas: 3     Isr: 3
    Topic: outcoming-error-topic       Partition: 6    Leader: 4       Replicas: 4     Isr: 4
    Topic: outcoming-error-topic       Partition: 7    Leader: 1       Replicas: 1     Isr: 1
    Topic: outcoming-error-topic       Partition: 8    Leader: 2       Replicas: 2     Isr: 2
    Topic: outcoming-error-topic       Partition: 9    Leader: 3       Replicas: 3     Isr: 3

How can i delete Leader 2...4 ? or may be i need delete partition for this Leader , but how ?

UPD..

Also we use kafka_exporter for monitoring kafka with prometheus. After 2 brokers down in log of kafka_exporter we get this errors:

level=error msg="Cannot get oldest offset of topic outcoming-error-topic partition  10: kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes." source="kafka_exporter.go:296"
1

1 Answers

5
votes

You could use Kafka's kafka-reassign-partitions.sh to do that. You have two ways, one is generating a proposal of new assignments, and the other one is manually specifying the leaders for specific partitions.


1 . Generate a proposal

The first method, as specified on the kafka docs, follows this logic:

1.1 Generate Proposed partition reassignment configuration

First, you should create a json file such as the provided in the link. Let's name it topics.json.

{
  "topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
  "version":1
}

This will tell kafka what are the topics you are willing to rellocate their partitions from. In the example, he wants Kafka to make a proposal for topics foo1 and foo2.

With that json, call the tool and set the active broker list in the command:

kafka-reassign-partitions.sh --zookeeper $ZK_HOSTS 
--topics-to-move-json-file topics.json --broker-list "1,2,3,4,5" --generate

This will output Kafka's proposal, which you can save into another .json file. For example:

{
  "version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
              {"topic":"foo1","partition":0,"replicas":[5,6]},
              {"topic":"foo2","partition":2,"replicas":[5,6]},
              {"topic":"foo2","partition":0,"replicas":[5,6]},
              {"topic":"foo1","partition":1,"replicas":[5,6]},
              {"topic":"foo2","partition":1,"replicas":[5,6]}]
}

You can manually modify some of the assignments, if you want to (or think it's the proper think to do, as the tool is not perfect). Save the json into a file, for example, reassign-example.json, which will be used in the next step.

1.2. Execute the Proposed partition reassignment

Let's make Kafka execute the proposal and move the partitions. For that, execute:

bin/kafka-reassign-partitions.sh --zookeeper $ZK_HOSTS 
 --reassignment-json-file reassign-example.json --execute

This will execute the partition movement defined on the reassign-example.json file.


2 . Manual specification

The second method is fairly easier, but you must manually identify the partitions you want to reassign. For example, if you want partition 1 of topic XXX to move to brokers 5 and 6, you could create a json file (manual-reassign.json) such as:

{"version":1,"partitions":[{"topic":"XXX","partition":1,"replicas":[5,6]}]}

The way it's launched is the same as in the previous method:

bin/kafka-reassign-partitions.sh --zookeeper $ZK_HOSTS 
 --reassignment-json-file manual-reassign.json --execute