3
votes

When we produce messages we can define a callback, this callback can expect an exception:

kafkaProducer.send(producerRecord, new Callback() {
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e == null) {
      // OK
    } else {
      // NOT OK
    }
  }
});

Considered the buitl-in retry logic in the producer, I wonder which kind of exception should developers deal explicitly with?

3

3 Answers

4
votes

According to the Callback Java Docs there are the following Exception possible happening during callback:

The exception thrown during processing of this record. Null if no error occurred. Possible thrown exceptions include:

Non-Retriable exceptions (fatal, the message will never be sent):

  • InvalidTopicException
  • OffsetMetadataTooLargeException
  • RecordBatchTooLargeException
  • RecordTooLargeException
  • UnknownServerException

Retriable exceptions (transient, may be covered by increasing #.retries):

  • CorruptRecordException
  • InchvalidMetadataException
  • NotEnoughReplicasAfterAppendException
  • NotEnoughReplicasException
  • OffsetOutOfRangeException
  • TimeoutException
  • UnknownTopicOrPartitionException

Maybe this is a unsatisfactory answer, but in the end which Exceptions and how to handle them completely relies on your use case and business requirements.

Handling Producer Retries

However, as a developer you also need to deal with the retry mechanism itself of the Kafka Producer. The retries are mainly driven by:

retries: Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection (default: 5) to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement. Users should generally prefer to leave this config unset and instead use delivery.timeout.ms to control retry behavior.

retry.backoff.ms: The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

request.timeout.ms: The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

The recommendation is to keep the default values of those three configurations above and rather focus on the hard upper time limit defined by

delivery.timeout.ms: An upper bound on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending, the time to await acknowledgement from the broker (if expected), and the time allowed for retriable send failures. The producer may report failure to send a record earlier than this config if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch which reached an earlier delivery expiration deadline. The value of this config should be greater than or equal to the sum of request.timeout.ms and linger.ms.

0
votes

You may get BufferExhaustedException or TimeoutException

Just bring your Kafka down after the producer has produced one record. And then continue producing records. After sometime, you should be seeing exceptions in the callback.

This is because, when you sent the first record, the metadata is fetched, after that, the records will be batched and buffered and they expire eventually after some timeout during which you may see these exceptions.

I suppose that the timeout is delivery.timeout.ms which when expired give you a TimeoutException exception there.

0
votes

Trying to add more info to @Mike's answer, I think only a few Exceptions are enum in Callback Interface.

Here you can see the whole list: kafka.common.errors

And here, you can see which ones are retriables and which ones are not: kafka protocol guide

And the code could be sht like this:

producer.send(record, callback)

def callback: Callback = new Callback {
    override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
      if(null != e) {
         if (e == RecordTooLargeException || e == UnknownServerException || ..) {
            log.error("Winter is comming") //it's non-retriable
            writeDiscardRecordsToSomewhereElse
         } else {
            log.warn("It's no that cold") //it's retriable
         }
      } else {
        log.debug("It's summer. Everything is fine")
      }
    }
}