4
votes

I have multi thread app which uses producer class to produce messages, earlier i was using below code to create producer for each request.where KafkaProducer was newly built with each request as below:

KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);

ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        isValidMsg[0] = false;
                        exception.printStackTrace();
                        saveOrUpdateLog(msgBean, producerType, exception);
                        logger.error("ERROR:Unable to produce message.",exception);
                    }
                }
            });
producer.close();

Then I read Kafka docs on producer and come to know we should use single producer instance to have good performance.

Then I created single instance of KafkaProducer inside a singleton class.

Now when & where we should close the producer. Obviously if we close the producer after first send request it wont find the producer to resend messages hence throwing :

java.lang.IllegalStateException: Cannot send after the producer is closed.

OR how we can reconnect to producer once closed. Problem is if program crashes or have exceptions then?

3

3 Answers

5
votes

Generally, calling close() on the KafkaProducer is sufficient to make sure all inflight records have completed:

/**
 * Close this producer. This method blocks until all previously sent requests complete.
 * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
 * <p>
 * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
 * will be called instead. We do this because the sender thread would otherwise try to join itself and
 * block forever.</strong>
 * <p>
 *
 * @throws InterruptException If the thread is interrupted while blocked
 */

If your producer is being used throughout the lifetime of your application, don't close it up until you get a termination signal, then call close(). As said in the documentation, the producer is safe to used in a multi-threaded environment and hence you should re-use the same instance.

If you're sharing your KafkaProducer in multiple threads, you have two choices:

  1. Call close() while registering a shutdown callback via Runtime.getRuntime().addShutdownHook from your main execution thread
  2. Have your multi-threaded methods race for closing on only allow for a single one to win.

A rough sketch of 2 would possibly look like this:

object KafkaOwner {
  private var producer: KafkaProducer = ???
  @volatile private var isClosed = false
     
  def close(): Unit = {
    if (!isClosed) {
      kafkaProducer.close()
      isClosed = true
    }
  }
    
  def instance: KafkaProducer = {
    this.synchronized {
      if (!isClosed) producer 
      else {
        producer = new KafkaProducer()
        isClosed = false
      }
    }
  }
}
3
votes

As described in javadoc for KafkaProducer:

public void close()

Close this producer. This method blocks until all previously sent requests complete.
This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS).

src: https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()

So you don't need to worry that your messages won't be sent, even if you call close immediately after send.

If you plan to use a KafkaProducer more than once, then close it only after you've finished using it. If you still want to have the guarantee that your message is actually sent before your method completes and not waiting in a buffer, then use KafkaProducer#flush() which will block until current buffer is sent. You can also block on Future#get() if you prefer.

There is also one caveat to be aware of if you don't plan to ever close your KafkaProducer (e.g. in short-lived apps, where you just send some data and the app immediately terminates after sending). The KafkaProducer IO thread is a daemon thread, which means the JVM will not wait until this thread finishes to terminate the VM. So, to ensure that your messages are actually sent use KafkaProducer#flush(), no-arg KafkaProducer#close() or block on Future#get().

0
votes

Kafka producer is supposed to be thread safe and frugal with it's thread pool. you might want to use

producer.flush();

instead of

producer.close();

leaving the producer open until program termination or until your sure you won't need it any more.

If you still want to close the producer, then recreate it on demand.

producer = new KafkaProducer<String, byte[]>(prop);