3
votes

Let's say I have two brokers.

I read that KafkaProducer creates producer thread equal to number of brokers. So I will have two internal threads in this case.

Let's say I have 5 topics and I just got 200 messages per second. How does kafka perfom batching?

batch.size=30 messages. [topic1=5, topic2=10, topic3=3, topic4=10, topic5=2 messages] These are the top order messages and corresponding topics.

How does kafka perform batching?

2
I am wondering, what the goal of this question is. What is the reason you want to know this internal implementation details? It not really possible to predict the exact pattern -- there are too many variables involved... There should be no reason to worry about this details. - Matthias J. Sax
Just want to know internal things. Curiosity!!!!! - Gibbs

2 Answers

4
votes

I read that KafkaProducer creates producer thread equal to number of brokers. So I will have two internal threads in this case.

Not sure, where you got this information from, but it's not correct. A KafkaProducer does have a single background thread to write data async to brokers.

How batching happens, is hard to predict in detail. It depends on your batch.size (that is a max value). Furthermore, there is linger.ms parameter, that define how long to hold data back before sending it (even if batches are not full).

In more detail, there will be open network connections to all brokers that host partitions you write to. Furthermore, batching happens based on partition -- however, multiple batches can be included to a single request to a broker.

2
votes

Record Accumulator is responsible for batching values destined for Kafka brokers.

- maintains a ConcurrentMap<TopicPartition, Deque<RecordBatch>>
- gets Deque of relevant TopicPartition
- gets last RecordBatch present in Deque, if RecordBatch(bytebuffer bounded by batch.size) is not full appends value to the RecordBatch
- if last RecordBatch is null, no RecordBatch exists for the relevant topic partition hence allocates a new byte buffer
- does a double check locking on last RecordBatch again, incase some other thread might have created the RecordBatch
- if RecordBatch exists, tries appending the value
- if still RecordBatch is null, creates MemoryRecords (backed by byte buffer)
- adds MemoryRecords to RecordBatch
- appends value to RecordBatch ( inside MemoryRecords eventually Byte Buffer )
- adds RecordBatch to Deque

Class Hierarchy:

RecordBatch
 - MemoryRecords
   - ByteBuffer