5
votes

I have a kafka client code which connects to Kafka( Server 0.10.1 and client is 0.10.2) brokers. There are 2 topics with 2 different consumer group in the code and also there is a producer. Getting the NetworkException from the producer code once in a while( once in 2 days, once in 5 days, ...). We see consumer group (Re)joining info in the logs for both the consumer group followed by the NetworkException from the producer future.get() call. Not sure why are we getting this error.

Code :-

final Future<RecordMetadata> futureResponse = 
producer.send(new ProducerRecord<>("ping_topic", "ping"));  
futureResponse.get();

Exception :-

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)

Kafka API definition for NetworkException,

"A misc. network-related IOException occurred when making a request. This could be because the client's metadata is out of date and it is making a request to a node that is now dead."

Thanks

1
Hi, did you find a solution ?? I don't think Kafka community even want to help resolve the issues related to this problem, or even they don't have found the solution yet.Rakesh Yadav
@Rakesh The community can't solve a network problem that's very likely very specific to the environment where Kafka is installed. At the very least, the producer and broker settings need added, TRACE logging would need enabled on every component and captured, then if deemed an actual Kafka problem, maybe file a JIRAOneCricketeer
@cricket_007 I mean to say that if developers are facing issues with their apis on forums like StackOverflow, they must respond if solution is not provided for such questions.Rakesh Yadav
@Rakesh "Must" is a strong word... We are all volunteers here. We don't have to respond, especially if there is not adequate information to go off ofOneCricketeer

1 Answers

1
votes

I had the same error while testing the Kafka Consumer. I was using a sender template for it. In the consumer configuration I set additionally the following properties:

 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);

After sending the Message I added a thread sleep:

   ListenableFuture<SendResult<String, String>> future =
    senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);

  Thread.Sleep(10000). 

It was necessary to make the test work, but maybe not suitable for your case.