After a Kafka topic has been created by a producer or an administrator, how would you change the number of replicas of this topic?
9 Answers
Edit: I was proven to be wrong - please check excellent answer from Łukasz Dumiszewski.
I'm leaving my original answer for completness for now.
I don't think you can. Normally it would be something like
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic test2 --replication-factor 3
but it says
Option "[replication-factor]" can't be used with option"[alter]"
It is funny that you can change number of partitions on the fly (which is often hugely destructive action when done in runtime), but cannot increase replication factor, which should be transparent. But remember, it is 0.10, not 10.0... Please see here for enhancement request https://issues.apache.org/jira/browse/KAFKA-1543
To increase the number of replicas for a given topic you have to:
1. Specify the extra replicas in a custom reassignment json file
For example, you could create increase-replication-factor.json and put this content in it:
{"version":1,
"partitions":[
{"topic":"signals","partition":0,"replicas":[0,1,2]},
{"topic":"signals","partition":1,"replicas":[0,1,2]},
{"topic":"signals","partition":2,"replicas":[0,1,2]}
]}
2. Use the file with the --execute option of the kafka-reassign-partitions tool
[or kafka-reassign-partitions.sh - depending on the kafka package]
For example:
$ kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
3. Verify the replication factor with the kafka-topics tool
[or kafka-topics.sh - depending on the kafka package]
$ kafka-topics --zookeeper localhost:2181 --topic signals --describe
Topic:signals PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=1000000000
Topic: signals Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1
Topic: signals Partition: 1 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1
Topic: signals Partition: 2 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1
See also: the part of the official documentation that describes how to increase the replication factor.
Łukasz Dumiszewski's answer is correct but manually generating that file is a bit hard. Luckily there are some easy ways to achieve what @Łukasz Dumiszewski said.
If you are using
kafka-manager
tool, from version2.0.0.2
you can change the replication factor inGenerate Partition Assignment
section in a topic view. Then you should click onReassign Partitions
to apply the generated partition assignment (if you select a different replication factor, you will get a warning but you can click onForce Reassign
afterward).If you have ruby installed you can use this helper script
- If you prefer nodejs you can generate the file with this gist too.
This script may help you, if you want change replication factor for all topics:
#!/bin/bash
topics=`kafka-topics --list --zookeeper zookeeper:2181`
while read -r line; do lines+=("$line"); done <<<"$topics"
echo '{"version":1,
"partitions":[' > tmp.json
for t in $topics; do
if [ "${t}" == "${lines[-1]}" ]; then
echo " {\"topic\":\"${t}\",\"partition\":0,\"replicas\":[0,1,2]}" >> tmp.json
else
echo " {\"topic\":\"${t}\",\"partition\":0,\"replicas\":[0,1,2]}," >> tmp.json
fi
done
echo ' ]
}' >> tmp.json
kafka-reassign-partitions --zookeeper zookeeper:2181 --reassignment-json-file tmp.json --execute
You can also use kafkactl for this:
# first run with --validate-only to see what kafkactl will do
kafkactl alter topic my-topic --replication-factor 2 --validate-only
# then do the replica reassignment
kafkactl alter topic my-topic --replication-factor 2
Note that the Kafka API that kafkactl is using for this is only available for Kafka ≥ 2.4.0.
Disclaimer: I am contributor to this project
The scripted answer of @Дмитрий-Шепелев did not include a solution for topics with multiple partitions. This updated version does:
#!/bin/bash
brokerids="1,2,3"
topics=`kafka-topics --list --zookeeper zookeeper:2181`
while read -r line; do lines+=("$line"); done <<<"$topics"
echo '{"version":1,
"partitions":['
for t in $topics; do
sep=","
pcount=$(kafka-topics --describe --zookeeper zookeeper:2181 --topic $t | awk '{print $2}' | uniq -c |awk 'NR==2{print $1}')
for i in $(seq 0 $[pcount - 1]); do
if [ "${t}" == "${lines[-1]}" ] && [ "$[pcount - 1]" == "$i" ]; then sep=""; fi
randombrokers=$(echo "$brokerids" | sed -r 's/,/ /g' | tr " " "\n" | shuf | tr "\n" "," | head -c -1)
echo " {\"topic\":\"${t}\",\"partition\":${i},\"replicas\":[${randombrokers}]}$sep"
done
done
echo ' ]
}'
Note: it also randomizes the brokers and picks two replicas per partition. So make sure the brokerid's in the script are correctly defined.
Execute as follows:
$ ./reassign.sh > reassign.json
$ kafka-reassign-partitions --zookeeper zookeeper:2181 --reassignment-json-file reassign.json --execute
If you have a lot of partitions, using kafka-reassign-partitions
to generate the json file required by Łukasz Dumiszewski's answer (and the official documentation) can be a timesaver. Here is an example of replicating a 64 partition topic from 1 to 2 servers without having to specify all the partitions:
expand_topic=TestTopic
current_server=111
new_servers=111,222
echo '{"topics": [{"topic":"'${expand_topic}'"}], "version":1}' > /tmp/topics-to-expand.json
/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file /tmp/topics-to-expand.json --broker-list "${current_server}" --generate | tail -1 | sed s/\\[${current_server}\\]/\[${new_servers}\]/g | tee /tmp/topic-expand-plan.json
/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/topic-expand-plan.json --execute
/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic ${expand_topic}
Outputs:
Topic:TestTopic PartitionCount:64 ReplicationFactor:2 Configs:retention.ms=6048000
Topic: TestTopic Partition: 0 Leader: 111 Replicas: 111,222 Isr: 111,222
Topic: TestTopic Partition: 1 Leader: 111 Replicas: 111,222 Isr: 111,222
....
1. Copy all topics to json file
#!/bin/bash
topics=`kafka-topics.sh --zookeeper localhost:2181 --list`
while read -r line; do lines+=("$line"); done <<<"$topics"
echo '{"version":1,
"topics":['
for t in $topics; do
echo -e ' { "topic":' \"$t\" '},'
done
echo ' ]
}'
bash alltopics.sh > alltopics.json
2. Run kafka-reassign-partitions.sh to generate rebalanced file
kafka-reassign-partitions.sh --zookeeper localhost:2181 --broker-list "0,1,2" --generate --topics-to-move-json-file alltopics.json > reassign.json
3. Cleanup reassign.json file it contains existing and proposed values
4. Run kafka-reassign-partitions.sh to rebalance topics
kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file reassign.json --execute
To increase the number of replicas for a given topic you have to:
1. Specify the extra partitions to the existing topic with below command(let us say increase from 2 to 3)
bin/kafktopics.sh --zookeeper localhost:2181 --alter --topic topic-to-increase --partitions 3
2. Specify the extra replicas in a custom reassignment json file
For example, you could create increase-replication-factor.json and put this content in it:
{"version":1,
"partitions":[
{"topic":"topic-to-increase","partition":0,"replicas":[0,1,2]},
{"topic":"topic-to-increase","partition":1,"replicas":[0,1,2]},
{"topic":"topic-to-increase","partition":2,"replicas":[0,1,2]}
]}
3. Use the file with the --execute option of the kafka-reassign-partitions tool
bin/kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
4. Verify the replication factor with the kafka-topics tool
bin/kafka-topics --zookeeper localhost:2181 --topic topic-to-increase --describe