29
votes

I am new to kafka so apology if I sound stupid but what I understood so far is .. A stream of message can be defined as a topic, like a category. And every topic is divided into one or more partitions (each partition can have multiple replicas). so they act in parallel

From the Kafka main site they say

The producer is able to chose which message to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the message).

Does this mean while consuming I will be able to choose the message offset from particular partition? While running multiple partitions is it possible to choose from one specific partition i.e partition 0?

In Kafka 0.7 quick start they say

Send a message with a partition key. Messages with the same key are sent to the same partition.

And the key can be provided while creating the producer as below

    ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-key", "test-message");
    producer.send(data);

Now how do I consume message based on this key? what is the actual impact of using this key while producing in Kafka ?

While creating producer in 0.8beta we can provide the partitioner class attribute through the config file. The custom partitioner class can be perhaps created implementing the kafka partitioner interface. But m little confused how exactly it works. 0.8 doc also does not explain much. Any advice or m i missing something ?

3

3 Answers

19
votes

Each topic in Kafka is split into many partitions. Partition allows for parallel consumption increasing throughput.

Producer publishes the message to a topic using the Kafka producer client library which balances the messages across the available partitions using a Partitioner. The broker to which the producer connects to takes care of sending the message to the broker which is the leader of that partition using the partition owner information in zookeeper. Consumers use Kafka’s High-level consumer library (which handles broker leader changes, managing offset info in zookeeper and figuring out partition owner info etc implicitly) to consume messages from partitions in streams; each stream may be mapped to a few partitions depending on how the consumer chooses to create the message streams.

For example, if there are 10 partitions for a topic and 3 consumer instances (C1,C2,C3 started in that order) all belonging to the same Consumer Group, we can have different consumption models that allow read parallelism as below

Each consumer uses a single stream. In this model, when C1 starts all 10 partitions of the topic are mapped to the same stream and C1 starts consuming from that stream. When C2 starts, Kafka rebalances the partitions between the two streams. So, each stream will be assigned to 5 partitions(depending on the rebalance algorithm it might also be 4 vs 6) and each consumer consumes from its stream. Similarly, when C3 starts, the partitions are again rebalanced between the 3 streams. Note that in this model, when consuming from a stream assigned to more than one partition, the order of messages will be jumbled between partitions.

Each consumer uses more than one stream (say C1 uses 3, C2 uses 3 and C3 uses 4). In this model, when C1 starts, all the 10 partitions are assigned to the 3 streams and C1 can consume from the 3 streams concurrently using multiple threads. When C2 starts, the partitions are rebalanced between the 6 streams and similarly when C3 starts, the partitions are rebalanced between the 10 streams. Each consumer can consume concurrently from multiple streams. Note that the number of streams and partitions here are equal. In case the number of streams exceed the partitions, some streams will not get any messages as they will not be assigned any partitions.

18
votes

This is what I've found so far ..

Define our own custom partitioner class by implementing the kafka Partitioner interface. The implemented method will have two arguments, first the key that we provide from the producer and next the number of partition available. So we can define our own logic to set which key of message goes to what partition.

Now while creating the producer we can specify our own partitioner class using the "partitioner.class" attribute

    props.put("partitioner.class", "path.to.custom.partitioner.class");

If we don't mention it then Kafka will use its default class and try to distribute message evenly among the partitions available.

Also inform Kafka how to serialize the key

    props.put("key.serializer.class", "kafka.serializer.StringEncoder");

Now if we send some message using a key in the producer the message will be delivered to a specific partition (based on our logic written on the custom partitioner class), and in the consumer (SimpleConsumer) level we can specify the partition to retrieve the specific messages.

In case we need to pass a String as a key, the same should be handled in the custom partitioner class ( take hash value of the key and then take first two digit etc )

6
votes

Does this mean while consuming I will be able to choose the message offset from particular partition? While running multiple partitions is it possible to choose from one specific partition i.e partition 0?

Yes you can choose message from one specific partition from your consumer but if you want that to be identified dynamically then it depends on the logic how you have implemented Partitioner Class in your producer.

Now how do I consume message based on this key? what is the actual impact of using this key while producing in Kafka ?

There are two way of consuming the message. One is using Zookeeper Host and another is Static Host. Zookeper host consumes message from all partition. However if you are uisng Static Host than you can provide broker with partition number that needs to be consumed.

Please check below example of Kafka 0.8

Producer

KeyedMessage<String, String> data = new KeyedMessage<String, String>(<<topicName>>, <<KeyForPartition>>, <<Message>>);

Partition Class

   public int partition(Object arg0, int arg1) {
        // arg0 is the key given while producing, arg1 is the number of
        // partition the broker has
        long organizationId = Long.parseLong((String) arg0);
        // if the given key is less than the no of partition available then send
        // it according to the key given Else send it to the last partition
        if (arg1 < organizationId) {

            return (arg1 - 1);
        }
        // return (int) (organizationId % arg1);
        return Integer.parseInt((String) arg0);
    }

So the partiotioner class decide where to send message based on your logic.

Consumer (PN:I have used Storm Kafka 0.8 integration)

        HostPort hosts = new HostPort("10.**.**.***",9092);

        GlobalPartitionInformation gpi = new GlobalPartitionInformation();
        gpi.addPartition(0, hosts);
        gpi.addPartition(2, hosts);

        StaticHosts statHost = new StaticHosts(gpi);

        SpoutConfig spoutConf = new SpoutConfig(statHost, <<topicName>>, "/kafkastorm", <<spoutConfigId>>);