3
votes

I am using KafkaProducer from the kafka-client 1.0.0 library, and as per the documentation, the method Future<RecordMetadata> send(ProducerRecord<K, V> record) will immediately return but actually, but looks like not. This method also calls another method which is doSend (see below for the snippet) in the same class, and inside this method, it is waiting for the metadata of the topic, which I think is necessary as it is related to partitions and etc.

/**
 * Implementation of asynchronously send a record to a topic.
 */
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // first make sure the metadata for the topic is available
        ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;

Is there any other options that it is fully asynchronous? The problem with this why I wanted it to be fully asynchronous is because if some of the servers in the bootstrap.servers are not responding, it will wait with the time based on max.block.ms, but i don't actually want it to wait, but instead, i just wanted it to return.

The documentation where i saw that it is gonna return immediately: KafkaProducer java doc

The send is asynchronous and this method will return immediately once the record has been stored in the buffer of records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one.

2

2 Answers

4
votes

your analysis is correct - kafka has a (sometimes) blocking "non-blocking" API. this has been brought up before - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update - but never prioritized.

0
votes

It's as asynchronous as it can be. Kafka maintains a cache of metadata that gets updated occasionally to keep it current and in your scenario you only wait if that cache is stale or not initialized. Once the cache is initialized there's no wait.

If your code has a single upcoming send() that must be executed as quickly as possible, you might try sending a prepatory partitionsFor() method call to the producer to see if you can't force update the cache if needed.

Aside from that, there will always be the potential, occasional wait for the metadata cache to be refreshed.