3
votes

I'm looking for a way how to distribute messages between two Kafka topics. In original topic I have 20 partitions with 1000000 messages per partition. I want to have a new topic with 1000 partitions and spread messages across new wider partition range.

1T -> 20P -> 1000000 messages per partition (total 20m/topic)
2T -> 1000P -> 20000 messages per partition (total 20m/topic)

Is it possible to do that in Kafka (via topic mirroring or some other technique)?

1

1 Answers

1
votes

You could use MirrorMaker (version 1) that comes with Kafka. This tool is mainly used to replicate data from one data center to another. It is build on the assumption that the topic names stay the same in both clusters.

However, you can provide your customised MessageHandler that renames a topic.

package org.xxx.java;

import java.util.Collections;
import java.util.List;
import kafka.consumer.BaseConsumerRecord;
import kafka.tools.MirrorMaker;
import org.apache.kafka.clients.producer.ProducerRecord;


/**
 * An example implementation of MirrorMakerMessageHandler that allows to rename topic.
 */
public class TopicRenameHandler implements MirrorMaker.MirrorMakerMessageHandler {
  private final String newName;

  public TopicRenameHandler(String newName) {
    this.newName = newName;
  }

  public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
    return Collections.singletonList(new ProducerRecord<byte[], byte[]>(newName, record.partition(), record.key(), record.value()));
  }
}

I used the following dependencies in my pom.xml file

    <properties>
        <kafka.version>2.5.0</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    </dependencies>

Compile the code above and make sure to add your class into the CLASSPATH

export CLASSPATH=$CLASSPATH:/.../target/MirrorMakerRenameTopics-1.0.jar

Now, together with some basic consumer.properties

bootstrap.servers=localhost:9092
client.id=mirror-maker-consumer
group.id=mirror-maker-rename-topic
auto.offset.reset=earliest

and producer.properties

bootstrap.servers=localhost:9092
client.id=mirror-maker-producer

you can call the kafka-mirror-maker as below

kafka-mirror-maker --consumer.config /path/to/consumer.properties \
 --producer.config /path/to/producer.properties \
 --num.streams 1 \
 --whitelist="topicToBeRenamed" \
 --message.handler org.xxx.java.TopicRenameHandler \
 --message.handler.args "newTopicName"

Please note the following two caveats with this approach:

  • As you are planning to change the number of partitions the ordering of the messages within the new topic might be different compared to the old topic. Messages are getting partitioned by the key in Kafka by default.
  • Using the MirrorMaker will not copy your existing offsets in the old topic but rather start writing new offsets. So, there will be (almost) no relation between the offsets from the old topic to the offsets of the new topic.