11
votes

I have a Kafka consumer. It seems to work for a while, and then die. It does this repeatedly. I get this exception but no other information.

org.apache.kafka.common.errors.TimeoutException:
Failed to get offsets by times in 305000 ms

305000 ms is 5 minutes. Is there any clue about what might cause this? Or steps to try and find out?

In case it's relevant:

I have 3 processes on different machines, using the latest Java Kafka Client version 0.10.2.0. Each machine is running 20 threads, each thread has a separate Consumer. By design, when one thread dies, all threads are killed and the process dies, and are restarted. This leads to ~20 consumers simultaneously dying and restarting, which will lead to a rebalance. So it's possible that this may cause a periodic interference between the clients. This doesn't explain why I get this exception in the first place, however.

I have three Kafka machines and three Zookeeper machines. Each client has all 3 Kafka machines in its bootstrap.servers configuration. The topic has 200 partitions, meaning that each thread is assigned approx 3 partitions. The topic has a replication factor of 2.

There are no errors in the Kafka or Zookeeper logs.

The following config values are set, no others.

  • bootstrap.servers
  • group.id
  • key.deserializer
  • value.deserializer
1
where you trying to forcefully set offset for topic partitions for each consumer?Nikesh Devaki
Sorry, this was a year ago, I can't remember the context.Joe
@Joe how did you resolve this?xabhi
I'm sorry, this was a year and a half ago! I would recommend making sure you're running an up-to-date version, and make sure the client and servers are the same version.Joe

1 Answers

4
votes

I ran into this today. I saw two different versions of this error message, depending on if I was using the Kafka 1.0 client libraries or the Kafka 2.0 client libraries. Error message was "org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 305000 ms" for Kafka 1.0 client and "org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 30003ms" for the 2.0 client library.

I received this message when trying to monitor offsets/lag with the kafka-console-consumer command (e.g. kafka-consumer-groups --bootstrap-server {servers} --group {group} --describe) command. These commands are part of the kafka/confluent tools but I imagine this could happen to other clients.

The problem seemed to be that I had a topic with a replication factor of 1 that had partitions without an assigned leader. The only way I found this was by updating the {kafka_client_dir}\libexec\config\tools-log4j.properties file to log at the DEBUG level : log4j.rootLogger=DEBUG, stderr Note that this is the log4j config file for the kafka/confluent tools - YMMV for other clients. I am running them from my Mac.

When this was done, I saw the following message in the output, which alerted me to the ISR/offlineReplicas issue:

             [2019-01-28 11:41:54,290] DEBUG Updated cluster metadata version 2 to Cluster(id = 0B1zi_bbQVyrfKwqiDa2kw, 
    nodes = [
        brokerServer3:9092 (id: 3 rack: null), 
        brokerServer6:9092 (id: 6 rack: null), 
        brokerServer1:9092 (id: 1 rack: null), 
        brokerServer5:9092 (id: 5 rack: null), 
        brokerServer4:9092 (id: 4 rack: null)], partitions = [

            Partition(topic = myTopicWithReplicatinFactorOne, partition = 10, leader = 6, replicas = [6], isr = [6], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 11, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 12, leader = none, replicas = [2], isr = [], offlineReplicas = [2]), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 13, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 14, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 2, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 3, leader = 5, replicas = [5], isr = [5], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 4, leader = 6, replicas = [6], isr = [6], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 5, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 6, leader = none, replicas = [2], isr = [], offlineReplicas = [2]), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 7, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 8, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 9, leader = 5, replicas = [5], isr = [5], offlineReplicas = []), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 0, leader = none, replicas = [2], isr = [], offlineReplicas = [2]), 
            Partition(topic = myTopicWithReplicatinFactorOne, partition = 1, leader = 3, replicas = [3], isr = [3], offlineReplicas = [])
        ], controller = brokerServer4:9092 (id: 4 rack: null)) (org.apache.kafka.clients.Metadata)

You can see above where it says offlineReplicas = [2] - hinting at the issue. Also brokerServer2 was not in the list of brokers.

Ultimately, I restarted the affected broker (brokerServer2) to get it back in sync and once this was done, I had no issues using the command line tools again. There are probably better ways to fix this than to do a broker restart but it ultimately fixed the issue