3
votes

I need to catch the exceptions in case of Async send to Kafka. The Kafka producer Api comes with a fuction send(ProducerRecord record, Callback callback). But when I tested this against following two scenarios :

  • Kafka Broker Down
  • Topic not pre created The callbacks are not getting called. Rather I am getting warning in the code for unsuccessful send (as shown below).

Questions :

  • So are the callbacks called only for specific exceptions ?

  • When does Kafka Client try to connect to Kafka broker while async send : on every batch send or periodically ?

Kafka Warning Image

Note : I am also using linger.ms setting of 25 sec to batch send my records.


public class ProducerDemo {

    static KafkaProducer<String, String> producer;

    public static void main(String[] args) throws IOException {

         final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");

        producer = new KafkaProducer<String, String>(properties);
        String topic = "first_topic";

        for (int i = 0; i < 5; i++) {
            String value = "hello world " + Integer.toString(i);
            String key = "id_" + Integer.toString(i);

            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);

              producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        //execute everytime a record is successfully sent or exception is thrown
                        if(e == null){
                           // No Exception
                        }else{
                            //Exception Handling
                        }
                    }
                });
        }
        producer.close();
    }
3

3 Answers

0
votes

For the first question, here is the answer. As per the apache kafka documentation, you can capture below exceptions using onCompletion method when you are implementing Callback interface

https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/Callback.html

For the second question, the combination of below properties control when to send the records and as far as i understand, it's same for synchronous or asynchronous call.

linger.ms max.block.ms

https://kafka.apache.org/documentation/#linger.ms

0
votes

So are the callbacks called only for specific exceptions ?

Yes, that's how it works. From documentation (2.5.0):

     * Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
     * will be invoked when the request is complete.

Notice the important part: when the request is complete, what means that the producer must have accepted the record and sent the ProduceRequest to Kafka Broker. Without digging too deep into internals, this means that broker metadata must be present and the partition must exist.

When it comes to formal specification, you'd need to take a good look at send()'s Javadoc and possibly at KafkaProducer's implementation of doSend method. Out there you're going to see that multiple exceptions can be thrown at the in submitting call (instead of returning a future and invoking callback), e.g. :

  • if broker metadata is not available in timeout given,
  • if data could not be serialized,
  • if serialized form was too large, etc.
0
votes

You will get those warning for non-existing topic as a resilience mechanism provided with KafkaProducer. If you wait a bit longer(should be 60 seconds by default), the callback will be called eventually: Here's my snippet: enter image description here

So, when something goes wrong and async send is not successful, it will eventually fail with a failed future or/and a callback with exception. If you are not running it transactionally, it can still mean that some messages from the batch have found their way to the broker, while others haven't. It will most certainly be a problem if you need a blocking-style acknowledgement to the upstream system(like http ingestion interface, etc.) per every message that is sent to Kafka. The only way to do that is by blocking every message with the future's get, as described in the documentation: enter image description here

In general, I've noticed a lot of question related to KafkaProducer delivery semantics and guarantees. It can definitely be documented better.

One more thing, since you mentioned linger.ms:

Note that records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load batching will occur regardless of the linger configuration