I have multi thread app which uses producer class to produce messages, earlier i was using below code to create producer for each request.where KafkaProducer was newly built with each request as below:
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);
ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
isValidMsg[0] = false;
exception.printStackTrace();
saveOrUpdateLog(msgBean, producerType, exception);
logger.error("ERROR:Unable to produce message.",exception);
}
}
});
producer.close();
Then I read Kafka docs on producer and come to know we should use single producer instance to have good performance.
Then I created single instance of KafkaProducer inside a singleton class.
Now when & where we should close the producer. Obviously if we close the producer after first send request it wont find the producer to resend messages hence throwing :
java.lang.IllegalStateException: Cannot send after the producer is closed.
OR how we can reconnect to producer once closed. Problem is if program crashes or have exceptions then?