3
votes

Kafka allows for asynchronous message sending through below methods on Producer (KafkaProducer) class:

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record) 
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)

Successes can be handled through
1) the Future<RecordMetaData> object or
2) onCompletion method invoked by the callback. Full method signature and usage of onCompletion is as below (taken from kafka docs)

`

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);  
producer.send(record,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                           e.printStackTrace();
                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               });

While failure needs to be handled through the Exception e passed to the onCompletion method

Fine every thing looks good so far.

But if I am getting it right, any reasonable information that can be obtained from exception or e object is stacktrace and exception message. What I mean to point out here is, e does not contain any information of the actual record sent. Or in other words, it does not contain a reference to the actual record that was sent to kafka broker. So what useful processing or handling can be done by the producer if the record was not sent successfully. Really not much. Why I say this is - ideally I would like to make a log of the failed message some where and then try to resend it. But with the little information (e) provided by framework, i feel this is not possible.

Can someone point out if I am right or wrong?

1
as a note I want to mention that, i don't mean to say that exception e is not helpful. It is certainly help full for debugging and so in finding the cause of error. But what I am not sure is does it contain enough information to determine which record has failed so I could resend or log that recordsamshers
Give a try to following version of Callback class: class CallbackFunc implements Callback { private int recordId; public CallbackFunc(int recordId) { this.recordId = recordId; } @Override public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { Log.error("Error writing message to Kafka. Record ID: " + recordId); } } }Shahzad

1 Answers

2
votes

You could easily create a callback that receives the producerRecord as a constructor argument. So upon onCompletion with an exception, you can have complete knowledge of the producer record, and even try to send it again.

I dealt with the same issue. Created a callback that gets both producerRecord, and a callback handler that uses an executor service to send the record again. So eventually, I can tolerate any number of failures (e.g. network issues or kafka is down), and recover from it.