5
votes

We have a kafka cluster with three brokers (node ids 0,1,2) and a zookeeper setup with three nodes.

We created a topic "test" on this cluster with 20 partitions and replication factor 2. We are using Java producer API to send messages to this topic. One of the kafka broker intermittently goes down after which it is unrecoverable. To simulate the case, we killed one of the broker manually. As per the kafka arch, it is supposed to self recover, but which is not happening. When I describe the topic on the console, I see the number of ISR's reduced to one for few of the partitions as one of the broker killed. Now, whenever we are trying to push messages via the producer API (either Java client or console producer), we are encountering SocketTimeoutException.. One quick look into the logs says, "Unable to fetch the metadata"

 WARN [2015-07-01 22:55:07,590] [ReplicaFetcherThread-0-3][] kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-0-3],
 Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 23711; ClientId: ReplicaFetcherThread-0-3; 
 ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [zuluDelta,2] -> PartitionFetchInfo(11409,1048576),[zuluDelta,14] -> PartitionFetchInfo(11483,1048576). 
 Possible cause: java.nio.channels.ClosedChannelException


[2015-07-01 23:37:40,426] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:1,host:abc-0042.yy.xxx.com,port:9092] failed (kafka.client.ClientUtils$)
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
at kafka.utils.Utils$.read(Utils.scala:380)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Any leads will be appreciated...

1
How are you killing the server?Jeff Gong
i am killing the process using kill -9 . Will that really matter? I hope kafka should handle both controlled shutdown as well as uncontrolled!!Thrinath Dosapati
Did you find some solution to this problem? Our kafka producer lives within Tomcat and everytime if a broker goes down, the producer hangs...codejitsu
it could be your producers consistency level.Fish Biscuit

1 Answers

0
votes

From your error Unable to fetch metadata it could mostly be because you could have set the bootstrap.servers in the producer to the broker that has died.

Ideally, you must have more than one broker in the bootstrap.servers list because if one of the broker fails (or is unreachable) then the other could give you the metadata.

FYI: Metadata is the information about a particular topic that tells how many number of partitions it has, their leader brokers, follower brokers etc.

So, when a key is produced to a partition, its corresponding leader broker will be the one to whom the messages will be sent to.

From your question, your ISR set has only one broker. You could try setting the bootstrap.server to this broker.