You could use MirrorMaker (version 1) that comes with Kafka. This tool is mainly used to replicate data from one data center to another. It is build on the assumption that the topic names stay the same in both clusters.
However, you can provide your customised MessageHandler
that renames a topic.
package org.xxx.java;
import java.util.Collections;
import java.util.List;
import kafka.consumer.BaseConsumerRecord;
import kafka.tools.MirrorMaker;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* An example implementation of MirrorMakerMessageHandler that allows to rename topic.
*/
public class TopicRenameHandler implements MirrorMaker.MirrorMakerMessageHandler {
private final String newName;
public TopicRenameHandler(String newName) {
this.newName = newName;
}
public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
return Collections.singletonList(new ProducerRecord<byte[], byte[]>(newName, record.partition(), record.key(), record.value()));
}
}
I used the following dependencies in my pom.xml
file
<properties>
<kafka.version>2.5.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
Compile the code above and make sure to add your class into the CLASSPATH
export CLASSPATH=$CLASSPATH:/.../target/MirrorMakerRenameTopics-1.0.jar
Now, together with some basic consumer.properties
bootstrap.servers=localhost:9092
client.id=mirror-maker-consumer
group.id=mirror-maker-rename-topic
auto.offset.reset=earliest
and producer.properties
bootstrap.servers=localhost:9092
client.id=mirror-maker-producer
you can call the kafka-mirror-maker
as below
kafka-mirror-maker --consumer.config /path/to/consumer.properties \
--producer.config /path/to/producer.properties \
--num.streams 1 \
--whitelist="topicToBeRenamed" \
--message.handler org.xxx.java.TopicRenameHandler \
--message.handler.args "newTopicName"
Please note the following two caveats with this approach:
- As you are planning to change the number of partitions the ordering of the messages within the new topic might be different compared to the old topic. Messages are getting partitioned by the key in Kafka by default.
- Using the MirrorMaker will not copy your existing offsets in the old topic but rather start writing new offsets. So, there will be (almost) no relation between the offsets from the old topic to the offsets of the new topic.