176
votes

One of the first things I think about when using a new service (such as a non-RDBMS data store or a message queue) is: "How should I structure my data?".

I've read and watched some introductory materials. In particular, take, for example, Kafka: a Distributed Messaging System for Log Processing, which writes:

  • "a Topic is the container with which messages are associated"
  • "the smallest unit of parallelism is the partition of a topic. This implies that all messages that ... belong to a particular partition of a topic will be consumed by a consumer in a consumer group."

Knowing this, what would be a good example that illustrates how to use topics and partitions? When should something be a topic? When should something be a partition?

As an example, let's say my (Clojure) data looks like:

{:user-id 101 :viewed "/page1.html" :at #inst "2013-04-12T23:20:50.22Z"}
{:user-id 102 :viewed "/page2.html" :at #inst "2013-04-12T23:20:55.50Z"}

Should the topic be based on user-id? viewed? at? What about the partition?

How do I decide?

4
Strange this talks about topics and partitions, but not necessarily evolution of the data within them. What if you wanted to attach user agents or headers to those "user view" events? How do you evolve and communicate that in a way to downstream consumers?OneCricketeer
@OneCricketeer Sounds like a separate question to me :) Go for it...David J.

4 Answers

141
votes

When structuring your data for Kafka it really depends on how it´s meant to be consumed.

In my mind, a topic is a grouping of messages of a similar type that will be consumed by the same type of consumer so in the example above, I would just have a single topic and if you´ll decide to push some other kind of data through Kafka, you can add a new topic for that later.

Topics are registered in ZooKeeper which means that you might run into issues if trying to add too many of them, e.g. the case where you have a million users and have decided to create a topic per user.

Partitions on the other hand is a way to parallelize the consumption of the messages. The total number of partitions in a broker cluster need to be at least the same as the number of consumers in a consumer group to make sense of the partitioning feature. Consumers in a consumer group will split the burden of processing the topic between themselves according to the partitioning so that one consumer will only be concerned with messages in the partition itself is "assigned to".

Partitioning can either be explicitly set using a partition key on the producer side or if not provided, a random partition will be selected for every message.

63
votes

Once you know how to partition your event stream, the topic name will be easy, so let's answer that question first.

@Ludd is correct - the partition structure you choose will depend largely on how you want to process the event stream. Ideally you want a partition key which means that your event processing is partition-local.

For example:

  1. If you care about users' average time-on-site, then you should partition by :user-id. That way, all the events related to a single user's site activity will be available within the same partition. This means that a stream processing engine such as Apache Samza can calculate average time-on-site for a given user just by looking at the events in a single partition. This avoids having to perform any kind of costly partition-global processing
  2. If you care about the most popular pages on your website, you should partition by the :viewed page. Again, Samza will be able to keep a count of a given page's views just by looking at the events in a single partition

Generally, we are trying to avoid having to rely on global state (such as keeping counts in a remote database like DynamoDB or Cassandra), and instead be able to work using partition-local state. This is because local state is a fundamental primitive in stream processing.

If you need both of the above use-cases, then a common pattern with Kafka is to first partition by say :user-id, and then to re-partition by :viewed ready for the next phase of processing.

On topic names - an obvious one here would be events or user-events. To be more specific you could go with with events-by-user-id and/or events-by-viewed.

8
votes

This is not exactly related to the question, but in case you already have decided upon the logical segregation of records based on topics, and want to optimize the topic/partition count in Kafka, this blog post might come handy.

Key takeaways in a nutshell:

  • In general, the more partitions there are in a Kafka cluster, the higher the throughput one can achieve. Let the max throughout achievable on a single partition for production be p and consumption be c. Let’s say your target throughput is t. Then you need to have at least max(t/p, t/c) partitions.

  • Currently, in Kafka, each broker opens a file handle of both the index and the data file of every log segment. So, the more partitions, the higher that one needs to configure the open file handle limit in the underlying operating system. E.g. in our production system, we once saw an error saying too many files are open, while we had around 3600 topic partitions.

  • When a broker is shut down uncleanly (e.g., kill -9), the observed unavailability could be proportional to the number of partitions.

  • The end-to-end latency in Kafka is defined by the time from when a message is published by the producer to when the message is read by the consumer. As a rule of thumb, if you care about latency, it’s probably a good idea to limit the number of partitions per broker to 100 x b x r, where b is the number of brokers in a Kafka cluster and r is the replication factor.

5
votes

I think topic name is a conclusion of a kind of messages, and producer publish message to the topic and consumer subscribe message through subscribe topic.

A topic could have many partitions. partition is good for parallelism. partition is also the unit of replication,so in Kafka, leader and follower is also said at the level of partition. Actually a partition is an ordered queue which the order is the message arrived order. And the topic is composed by one or more queue in a simple word. This is useful for us to model our structure.

Kafka is developed by LinkedIn for log aggregation and delivery. this scene is very good as a example.

The user's events on your web or app can be logged by your Web sever and then sent to Kafka broker through the producer. In producer, you could specific the partition method, for example : event type (different event is saved in different partition) or event time (partition a day into different period according your app logic) or user type or just no logic and balance all logs into many partitions.

About your case in question, you can create one topic called "page-view-event", and create N partitions through hash keys to distribute the logs into all partitions evenly. Or you could choose a partition logic to make log distributing by your spirit.