Kafka allows for asynchronous message sending through below methods on Producer (KafkaProducer) class:
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
Successes can be handled through
1) the Future<RecordMetaData>
object or
2) onCompletion
method invoked by the callback. Full method signature and usage of onCompletion
is as below (taken from kafka docs)
`
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(record,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
While failure needs to be handled through the Exception e
passed to the onCompletion
method
Fine every thing looks good so far.
But if I am getting it right, any reasonable information that can be obtained from exception
or e
object is stacktrace and exception message. What I mean to point out here is, e
does not contain any information of the actual record sent. Or in other words, it does not contain a reference to the actual record
that was sent to kafka broker. So what useful processing or handling can be done by the producer if the record
was not sent successfully. Really not much.
Why I say this is - ideally I would like to make a log of the failed message some where and then try to resend it. But with the little information (e
) provided by framework, i feel this is not possible.
Can someone point out if I am right or wrong?
e
is not helpful. It is certainly help full for debugging and so in finding the cause of error. But what I am not sure is does it contain enough information to determine whichrecord
has failed so I could resend or log thatrecord
– samshers