2
votes

I am testing resilience of Kafka (apache; kafka_2.12-1.1.0). What i expect is that ISR of a topic should increase it self (i.e. replicate to available node) when ever a node crashes. I spent 4 days googling for possible solutions, but was of no use.

Have 3 node cluster, and created 3 brokers, 3 zoo keepers on it (1node = 1broker + 1 zookeeper) using docker (wurstmeister) updated the below in server.properties

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
min.insync.replicas=2
default.replication.factor=3

started all brokers; waited a minute; created topic with replication3, min in sync replication 2

bin/kafka-topics.sh --create --zookeeper 172.31.31.142:2181,172.31.26.102:2181,172.31.17.252:2181  --config 'min.insync.replicas=2' --replication-factor 3 --partitions 1 --topic test2

when i describe the topic i see the below data

bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic test2
Topic:test2     PartitionCount:1        ReplicationFactor:3     Configs:min.insync.replicas=2
        Topic: test2    Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

So far so good, Now i start consuers; followd by producers. When the consumpmtion is in full throttle i kill the broker #2. Now when i describe the same topic i see the below ([Edit-1])

bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic test2
Topic:test2     PartitionCount:1        ReplicationFactor:3     Configs:min.insync.replicas=2
        Topic: test2    Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3,1

bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic __consumer_offsets
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer Topic: __consumer_offsets       Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,3
        Topic: __consumer_offsets       Partition: 1    Leader: 3       Replicas: 2,3,1 Isr: 1,3
    .. .. .. 

[end of edit-1]

I let the kafka producer, consumer continue for couple of minutes; Q1: why does Replicas still show 2 when the broker 2 is down?

Now i added 2 more brokers to the cluster. While the producer, consumers continue i keep observing ISR; the no of ISR replicas dont increase they stick to 3,1 only. Q2: why is ISR not increasing even though 2 more brokers are available?.

Then i stopped the producer, consumer; waited couple of minutes; re-ran the describe command again --stillthe same result. when does ISR expand its replication?. Where there are 2 more nodes available, why did ISR not replicate?

i crreate my producer as follows

props.put("acks", "all");
props.put("retries", 4);
props.put("batch.size", new Integer(args[2]));// 60384
props.put("linger.ms", new Integer(args[3]));// 1
props.put("buffer.memory", args[4]);// 33554432
props.put("bootstrap.servers", args[6]);// host:port,host:port,host:port etc
props.put("max.request.size", "10485760");// 1048576

and consumer as follows

props.put("group.id", "testgroup");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", args[2]);// 1000
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put("max.partition.fetch.bytes", args[3]);// 52428800
    props.put("fetch.max.bytes", args[4]);// 1048576
    props.put("fetch.message.max.bytes", args[5]);// 1048576
    props.put("bootstrap.servers", args[6]);
    props.put("max.poll.records", args[7]);
    props.put("max.poll.interval.ms", "30000");
    props.put("auto.offset.reset", "latest");

In a separate experiment, when i removed another broker the i started seeing errors that total in sync replications are less than the minimum required. Surprizingly in this state the producer is not blocked; but i see the error on the broker server.log. No new messages are getting enqueued. Q4:Shouldnt producer be blocked? instead of throwing error on broker side? or is my understanding wrong?

Any help please?

2

2 Answers

3
votes

If I understand correctly, Kafka does not auto rebalance when you add brokers. A down replica will not be reassigned unless you use the repartition tool

It's not clear what difference are between your environments, but it looks like you didn't really kill a broker if it's still listed as a leader.

if you had two brokers down with min ISR as 2, then, yes you'll see errors. The producer should still be able to reach at least one broker, though, so I don't think it'll be completely blocked unless you set the ack value to all. The errors at the broker end are more related to placing replicas

3
votes

Recap of the meaning of replica: all partition replicas are replicas, even the leader one; in other words 2 replicas means you have the leader and one follower.

When you describe the topic, for your only partition you see: "Replicas: 2,3,1 Isr: 3,1" which means that when the topic was created the leader partition was assigned to broker 2 (the first in the replicas list), and followers where assigned to brokers 3 and 1; now the broker 2 is the "preferred leader" for that partition.

This assignment is not going to change from itself (the leader may change, but not the "preferred leader"), so your followers will not move to other brokers, only the leader role can be given to another in-sync replica. (There is a property auto.leader.rebalance.enable which set to true will allow the leader role to go back to the preferred leader if it is up again, otherwise the leader role will be kept by the newly elected leader...

Next time try to kill the leader broker and you will see that a new leader will be elected and used, but the "Replicas: 2,3,1" will stay.

And if you set replication-factor=3 acks=all and min.insync.replicas=2, you can produce as long as 2 replicas acknowledge the writes (the leader and one follower), but will get logs on the broker if if is not possible to maintain 3 ISRs...

Hope this helps...