6
votes

At first all the brokers in cluster can start and work just fine, but sometimes one of the broker will meet problem. And there are some phenomenon will show up:

  • whole cluster is hang, nor producer and consumer are not work, hence the network flow is down to zero from monitor;
  • use kafka-topic.sh describe the topic message, every replica is just fine, even the exceptional brokerid, and information in zk also normal;
  • the file-description number increase gradually on abnormal broker, which is read from /proc/sys/fs/file-nr
  • netstat the broker listen port 9092 display lots of "CLOSE_WAIT" status

Following is the error log shows from others brokers, while the abnormal broker log can not see any exception.

[2019-04-15 18:10:25,243] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=289971597, epoch=1254343) t
o node 1: java.io.IOException: Connection to 1 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2019-04-15 18:10:25,244] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=
0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={atm_error_intf-7=(offset=538244, logStartOffset=5319, maxBytes=1048576, currentLeaderEpoch=Op
tional[13])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=289971597, epoch=1254343)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:241)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
        at scala.Option.foreach(Option.scala:257)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2019-04-15 18:10:57,275] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=
0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={ieg_qsm_guildstatechangereport4pandora-19=(offset=52347859, logStartOffset=38458463, maxBytes=1048576, currentLeaderEpoch=Optional[50]), ieg_qsm_playerreporthighfrequency4pandora-10=(offset=97212897, logStartOffset=65418413, maxBytes=1048576, currentLeaderEpoch=Optional[46]), ieg_qsmtest_guildstatechangereport4pandora-13=(offset=25771, logStartOffset=20917, maxBytes=1048576, currentLeaderEpoch=Optional[46]), __consumer_offsets-10=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[57]), ieg_qsmtest_playerreporthighfrequency4pandora-7=(offset=141317, logStartOffset=118323, maxBytes=1048576, currentLeaderEpoch=Optional[45]), __consumer_offsets-25=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[44]), ieg_qsmtest_playerlogin-15=(offset=59440, logStartOffset=52149, maxBytes=1048576, currentLeaderEpoch=Optional[55]), dm_pdl_wefeng_findfriend_topic-12=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[42]), dm_pdl_wefeng_findfriend_topic_test-0=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[47]), ieg_qsmtest_guildstatechangereport4pandora-18=(offset=21042, logStartOffset=16441, maxBytes=1048576, currentLeaderEpoch=Optional[56]), ieg_qsm_playerlogin-1=(offset=27414596, logStartOffset=17328842, maxBytes=1048576, currentLeaderEpoch=Optional[45]), atm_error_intf-7=(offset=538244, logStartOffset=5319, maxBytes=1048576, currentLeaderEpoch=Optional[13]), __consumer_offsets-30=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[51]), ieg_qsm_playerreporthighfrequency4pandora-15=(offset=87995984, logStartOffset=53470647, maxBytes=1048576, currentLeaderEpoch=Optional[55]), __consumer_offsets-45=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[47]), ieg_qsm_playerlogin-6=(offset=25070198, logStartOffset=16224757, maxBytes=1048576, currentLeaderEpoch=Optional[54]), ieg_qsmtest_playerreporthighfrequency4pandora-12=(offset=141878, logStartOffset=122257, maxBytes=1048576, currentLeaderEpoch=Optional[56]), dm_pdl_wefeng_findfriend_topic-17=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[56]), ieg_qsm_guildstatechangereport4pandora-14=(offset=45869398, logStartOffset=27847747, maxBytes=1048576, currentLeaderEpoch=Optional[47]), dm_pdl_wefeng_findfriend_topic_test-5=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[57]), atm_error_intf-27=(offset=539432, logStartOffset=5392, maxBytes=1048576, currentLeaderEpoch=Optional[13]), ieg_qsmtest_playerlogin-10=(offset=66712, logStartOffset=55774, maxBytes=1048576, currentLeaderEpoch=Optional[48]), __consumer_offsets-5=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[47])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=289971597, epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:241)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
        at scala.Option.foreach(Option.scala:257)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

-Xmx8G -Xms8G, this is the jvm heap size. I only can kill -9 the instance and restart to resume it.

  • version kafka_2.12-2.1.0
  • java
1

1 Answers

2
votes

This was an issue reported on kafka 2.1.0. Please check https://issues.apache.org/jira/browse/KAFKA-7802 for details. It is marked duplicated by https://issues.apache.org/jira/browse/KAFKA-7697 which is fixed in kafka 2.2.0, so upgrading the kafka version should help.