
I'm running a 3-node Kafka cluster on AWS.

Kafka version:
Zookeeper version: 3.4

While performing a few stability tests I've noticed that messages get lost when I take the leader node down.

These are the steps to reproduce the issue:

Create a topic with replication-factor 3 which should make the data available on all 3 nodes.:

~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper ",," --create --topic stackoverflow --replication-factor 3 --partitions 20
Created topic "stackoverflow".
~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper ",," --describe --topic stackoverflow
Topic:stackoverflow    PartitionCount:20    ReplicationFactor:3    Configs:
    Topic: stackoverflow    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 1    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 2    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 3    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 4    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 5    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 6    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 7    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 8    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 9    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 10    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 11    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 12    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 13    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 14    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 15    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 16    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 17    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 18    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 19    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1

Start producing on that topic with the following code:

import time
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['' ,'' ,''])

    count = 0
    while True:
        producer.send('stackoverflow', 'message')
        count += 1
except KeyboardInterrupt:
    print "Sent %s messages." % count

At this point I kill one of the machines and wait until it returns to cluster.

When it's back I stop the producer and consume all the messages from that topic.

from kafka import KafkaConsumer

consumer = KafkaConsumer('stackoverflow',
                            bootstrap_servers=['' ,'' ,''],
    count = 0
    for message in consumer:
        count += 1
        print message
except KeyboardInterrupt:
    print "Received %s messages." % count

Two messages that have been sent are missing. The producer didn't return any errors.

kafka $ python producer.py
Sent 762 messages.

kafka $ python consumer.py
Received 760 messages.

I'm new to Kafka so I'd really appreciate any ideas for debugging this further. Or instructions for making the cluster more resilient.

I ran into exactly the same issue some time ago. During investigation I found interesting feature: flush() method returns after each message in the buffer is sent or request resulted in error, as stated in the documentation.

I mitigated it by:

  1. Disabling unclean.leader.election.enabled on brokers (if not set, it's true in kafka<0.11 and false in kafka>=0.11, so you need to set it to false on your 0.10.2)
  2. Chaging async producer (send & flush) to synchronous one: producer.send(...).get()
  3. Added parameter retries=5 to KafkaProducer init (to make producer survive broker shutdown).

In the end I figured the reason for lost messages was insufficient number of retries. After reading a few blog posts about highly-available kafka, I noticed that people are recommending really high values for the "retries" parameter.

In python that would be:

producer = KafkaProducer(bootstrap_servers=[...], retries=sys.maxint)

I performed my tests again confirmed that no messages were lost.