Is there a way to determine the number of batches that were created by a Kafka producer for a specific set of messages? For instance if I am sending 10K messages in a loop , is there a way to check how many batches were sent? I set the "batch.size" to a high value and my expectation was that the message will be buffered and there will be a delay in seeing the message in my consumer. However this seems to be printed almost immediately in my consumer program.
The default value if batch.size is 16384. Is this number of bytes?
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KafkaProducerApp {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks","0");
properties.put("batch.size",33554432);
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
Map<Integer,Integer> partitionCount = new HashMap<Integer,Integer>();
partitionCount.put(0,0);
partitionCount.put(1,0);
partitionCount.put(2,0);
try{
Date from = new Date();
for(int i=0;i<10000;i++) {
RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("test_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
//RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String,String>("test_topic",0,Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
System.out.println(" Offset = " + ack.offset());
System.out.println(" Partition = " + ack.partition());
partitionCount.put(ack.partition(),partitionCount.get(ack.partition())+1);
}
Date to = new Date();
System.out.println(" partition 0 =" + partitionCount.get(0));
System.out.println(" partition 1 =" + partitionCount.get(1));
System.out.println(" partition 2 =" + partitionCount.get(2));
System.out.println(" Elapsed Time = " + (to.getTime()-from.getTime())/1000);
} catch (Exception ex){
ex.printStackTrace();
} finally {
kafkaProducer.close();
}
}
}