7
votes

I'm using Kafka 0.11.0.0. I have a test program that publishes to a Kafka topic; if the zookeeper and Kafka servers are down (which is normal in my development environment; I bring them up as needed) then the call to KafkaProducer<>.send() hangs indefinitely.

I either need to have send() return, preferably indicating the error; or I need a way to check whether the servers are up or down. Basically, I want my test tool to be able tell me, "Hey, dummy, start up Kafka!" instead of hanging.

Is there a way for my producer task to determine whether the servers are up or down?

I'm calling the send() like this:

kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY,
    message), (rm, ex) -> {
        System.out.println("**** " + rm + "\n**** " +ex);
});

I have linger.ms = 1; I've tried retries=0, 1, and 2, and send() still blocks. I've never seen the callback called.

Older messages suggest setting metadata.fetch.timeout.ms to a small value, but that's gone in 0.11. Others suggest calling command line utilities to see if the servers are OK...but the referenced utilities also seem to be gone.

What's the graceful way to get this done?

2
That is strange. It should return with an error saying either "Failed to update metadata" or "Expiring x number of records". Check request.timeout.ms and max.block.ms setting for your producer. By default request.timeout.ms is 60 seconds longShades88
request.timeout.ms is 30000, and max.block.ms is 60000. I'll try decreasing those. (When working interactively, 30 seconds might as well be indefinite--my bad.)Will Duquette
OK, yes, that solves my immediate problem; thank you!Will Duquette
I'd still like to know how proactively check whether the server is up, though.Will Duquette
If that helped you, could you please accept my answer :-)Shades88

2 Answers

4
votes

That is strange. It should return with an error saying either "Failed to update metadata" or "Expiring x number of records".

Check request.timeout.ms and max.block.ms setting for your producer. By default request.timeout.ms is 60 seconds long

3
votes

We can send messages to broker in three ways :

Fire-and-forget : We send a message to the server and don’t really care if it arrives successfully or not. Most of the time, it will arrive successfully, since Kafka is highly available and the producer will retry sending messages automatically. However, some messages will get lost using this method.

Asynchronous send We call the send() method with a callback function, which gets triggered when it receives a response from the Kafka broker.

Synchronous send We send a message, the send() method returns a Future object, and we use get() to wait on the future and see if the send() was successful or not.

The simplest way to send a message synchronously is as follows:

ProducerRecord<String, String> record =
            new ProducerRecord<>(KAFKA_TOPIC, KEY, message);
    try {
            producer.send(record).get();
    } catch (Exception e) {
             e.printStackTrace();
    }

Here, we are using Future.get() to wait for a reply from Kafka. This method will throw an exception if the record is not sent successfully to Kafka. If there were no errors, we will get a RecordMetadata object that we can use to retrieve the offset the message was written to.

hope this helps.