2
votes

I have an application which uses Kafka 1.0 as a queue. The Kafka topic has 80 partitions and 80 consumers running. (Kafka-python consumers).

By running the command :

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup  --describe 

I see that one of the partitions is stuck at an offset, and the lag continuously increases as new records are added to it.

The output of the above command looks something like this:

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST

118 mytopic                       37         1924            2782            858        kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost
119 mytopic                       38         2741            2742            1          kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost
120 mytopic                       39         2713            2713            0          kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost
121 mytopic                       40         2687            2688            1          kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost

What causes this? Also resetting offset using reset-offsets command is not desirable as this server might not be manually monitored on a regular basis.

Clients run in background as parallel processes in a Linux m/c:

consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092',
                     session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1,
                     auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024,
                     value_deserializer=lambda m: json.loads(m.decode('ascii')))

for message in consumer:
    msg = json.loads(message.value)
    process_message(msg)
2

2 Answers

6
votes

If consumer offset is not moving after some time, then consumer is likely to have stopped. If consumer offset is moving, but consumer lag (difference between the end of the log and the consumer offset) is increasing, the consumer is slower than the producer. If the consumer is slow, the typical solution is to increase the degree of parallelism in the consumer. This may require increasing the number of partitions of a topic.

Read more at the Kafka docs.

To put it simply; you're producing more than you're consuming. You need to increase the rate of consumption to reduce the lag. You need to add more consumers. If you're just testing, then your consumer is slow.

0
votes

I am facing the similar issue. I am no expert in kafka and therefore need your input. I have 20 partitions and 20 worker pods running (1:1 ratio). Out of those 20 partitions, 2 partitions are overloaded (not necessarily same always). Since i have 18 more partitions, why are jobs not being sent to those and give them partitions which are overloaded a break.

If there is no key, then is kafka always do round-robin irrespective of the current load on the partition?

For example: In the above example, one of the partition has 858 lag and almost all other partitions and 0/1 lag. Then why doesn't it redistribute or start sending more load to other partitions?