1
votes

When producing message to Kafka you can get two kind of errors: retriables and non-retriables. How should you differentiate them when handling them?

I want to produce records asynchronously, saving in another topic (or HBase) those in which callback object receives a nonretriable exception and let the producer handle for me all those that receives a retriable exception (up to a maximum number of attempts and, when it finally reach it, becomes one of the first ones).

My question is: will the producer still handle the retrievable exceptions by itself despite the callback object? Because in the Interface Callback says:

Retriable exceptions (transient, may be covered by increasing # .retries)

Could be the code something like this?

producer.send(record, callback)

def callback: Callback = new Callback {
    override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
      if(null != e) {
         if (e == RecordTooLargeException || e == UnknownServerException || ..) {
            log.error("Winter is comming")
            writeDiscardRecordsToSomewhereElse
         } else {
            log.warn("It's no that cold") //it's retriable. The producer will keep trying by itself?
         }
      } else {
        log.debug("It's summer. Everything is fine")
      }
    }
}

Kafka version: 0.10.0

Any light will be appreciated! :)

1

1 Answers

0
votes

As the Kafka bible (aka Kafka-The Definitive Guide) says:

The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a nonretriable failure, commitAsync() will not retry.

The reason:

It does not retry is that by the time commitAsync() receives a response from the server, there may have been a later commit that was already successful.

Imagine that we sent a request to commit offset 2000. There is a temporary communication problem, so the broker never gets the request and therefore never responds. Meanwhile, we processed another batch and successfully committed offset 3000. If commitA sync() now retries the previously failed commit, it might succeed in committing offset 2000 after offset 3000 was already processed and committed. In the case of a rebalance, this will cause more duplicates.

Beside that, you still can create an increasing sequence number , which you can increase every time you commit and add that number to the Callback object. When the time to retry comes, just check if the current value of the Acc is equal to the number you gave to the Callback. If it does, it is safe and you can perform the commit. Otherwise, there has been a new commit and you should not retry the commit of this offset.

It seems a lot of troubles, and that is because if you are thinking on this, you should change your strategy.