12
votes

Kafka 1.1.1-cp1. (Edit 4: I ended up filing a bug with Kafka about this - https://issues.apache.org/jira/browse/KAFKA-7447)

I have 3 brokers, with min.insync.replicas = 2 for all topics, and offsets.commit.required.acks = -1.

When I stop one of the brokers, as you'd expect, it hands off the partitions it is leader for, and everything carries on as normal (consumers consuming, producers producing).

The problems start when I bring the broker back. What seems to happen is that it causes confusion in the cluster, and some __consumer_offset topics get immediately truncated to 0.

Here's a selection of logs, in chronological order, from an affected __consumer_offset partition (one that was originally lead by the broker which went down). The story plays out across logs from all three brokers.

Essentially, the broker that I bounced comes back, seemingly can't understand what the new leader means, truncates to 0, and then persuades the other replicas to truncate to 0 as well.

prod-kafka-2: (just starting up)

[2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Based on follower's leader epoch, leader replied with an unknown offset in __consumer_offsets-29. The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread)

prod-kafka-3: (sees replica1 come back)

[2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)

prod-kafka-2:

[2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling unloading of offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached groups. (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions __consumer_offsets-29 (kafka.server.ReplicaFetcherManager)
[2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous Leader Epoch was: 77 (kafka.cluster.Partition)
[2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling loading of offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:24:03,288] INFO [GroupMetadataManager brokerId=1] Finished loading offsets and group metadata from __consumer_offsets-29 in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

prod-kafka-3: struggling to agree with prod-kafka-2. Kicks it out of ISR, but then fights with ZooKeeper. Perhaps 2 and 3 both think they're leader?

[2018-09-17 09:24:15,372] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:15,377] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)

prod-kafka-2: rudely kicks BOTH of the other two replicas out of the ISR list, even though 2 is the one we just restarted and therefore is most likely behind. (Bear in mind that it already decided to truncate the topic to 0!)

[2018-09-17 09:24:16,481] INFO [Partition __consumer_offsets-29 broker=1] Shrinking ISR from 0,2,1 to 1 (kafka.cluster.Partition)

prod-kafka-3: still fighting with zookeeper. Eventually loses.

[2018-09-17 09:24:20,374] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:20,378] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:25,347] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:25,350] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:30,359] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:30,362] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:35,365] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:35,368] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:40,352] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:40,354] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:45,422] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:45,425] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:50,345] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:50,348] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:24:55,444] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:24:55,449] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:00,340] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:00,343] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:05,374] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:05,377] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:10,342] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:10,344] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:15,348] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:15,351] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:20,338] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:20,340] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:25,338] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:25,340] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:30,382] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:30,387] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:35,341] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:35,344] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:40,460] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:40,465] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2018-09-17 09:25:45,335] INFO [Partition __consumer_offsets-29 broker=2] Shrinking ISR from 0,2,1 to 0,2 (kafka.cluster.Partition)
[2018-09-17 09:25:45,338] INFO [Partition __consumer_offsets-29 broker=2] Cached zkVersion [1582] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)

prod-kafka-1: suddenly gets confused and also re-inits to 0, as prod-kafka-2 apparently becomes leader.

[2018-09-17 09:25:48,807] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote broker is not the leader for partition __consumer_offsets-29, which could indicate that the partition is being moved (kafka.server.ReplicaFetcherThread)

prod-kafka-3: finally decides that prod-kafka-2 is in charge, truncates accordingly

[2018-09-17 09:25:48,806] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions __consumer_offsets-29 (kafka.server.ReplicaFetcherManager)
[2018-09-17 09:25:48,807] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([__consumer_offsets-29, initOffset 0 to broker BrokerEndPoint(1,prod-kafka-2.c.i-lastfm-prod.internal,9092)] ) (kafka.server.ReplicaFetcherManager)
[2018-09-17 09:25:48,809] INFO [GroupMetadataManager brokerId=2] Scheduling unloading of offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:25:48,810] INFO [GroupMetadataManager brokerId=2] Finished unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached groups. (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:25:48,950] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Based on follower's leader epoch, leader replied with an unknown offset in __consumer_offsets-29. The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread)
[2018-09-17 09:25:48,951] INFO [Log partition=__consumer_offsets-29, dir=/var/lib/kafka/data] Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.Log)

prod-kafka-1: leadership inauguration confirmed.

[2018-09-17 09:25:50,207] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote broker is not the leader for partition __consumer_offsets-29, which could indicate that the partition is being moved (kafka.server.ReplicaFetcherThread)

prod-kafka-2: now that it has asserted its dominance via zookeeper, prod-kafka-3 added to the ISR list

[2018-09-17 09:25:50,210] INFO [Partition __consumer_offsets-29 broker=1] Expanding ISR from 1 to 1,2 (kafka.cluster.Partition)

prod-kafka-1: still struggling to accept reality, but eventually also truncates to 0.

[2018-09-17 09:25:51,430] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote broker is not the leader for partition __consumer_offsets-29, which could indicate that the partition is being moved (kafka.server.ReplicaFetcherThread)
[2018-09-17 09:25:52,615] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote broker is not the leader for partition __consumer_offsets-29, which could indicate that the partition is being moved (kafka.server.ReplicaFetcherThread)
[2018-09-17 09:25:53,637] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Remote broker is not the leader for partition __consumer_offsets-29, which could indicate that the partition is being moved (kafka.server.ReplicaFetcherThread)
[2018-09-17 09:25:54,150] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions __consumer_offsets-29 (kafka.server.ReplicaFetcherManager)
[2018-09-17 09:25:54,151] INFO [ReplicaFetcherManager on broker 0] Added fetcher for partitions List([__consumer_offsets-29, initOffset 0 to broker BrokerEndPoint(1,prod-kafka-2.c.i-lastfm-prod.internal,9092)] ) (kafka.server.ReplicaFetcherManager)
[2018-09-17 09:25:54,151] INFO [GroupMetadataManager brokerId=0] Scheduling unloading of offsets and group metadata from __consumer_offsets-29 (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:25:54,153] INFO [GroupMetadataManager brokerId=0] Finished unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached groups. (kafka.coordinator.group.GroupMetadataManager)
[2018-09-17 09:25:54,261] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Based on follower's leader epoch, leader replied with an unknown offset in __consumer_offsets-29. The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread)
[2018-09-17 09:25:54,261] INFO [Log partition=__consumer_offsets-29, dir=/var/lib/kafka/data] Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.Log)

prod-kafka-2: completes its coup of consumer offsets, all is now 0.

[2018-09-17 09:25:56,244] INFO [Partition __consumer_offsets-29 broker=1] Expanding ISR from 1,2 to 1,2,0 (kafka.cluster.Partition)

Edit:

As requested, here is kafka server.properties file:

broker.id=1
default.replication.factor=3
auto.create.topics.enable=false
min.insync.replicas=2
num.network.threads=12
num.io.threads=16
num.replica.fetchers=6
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=4
offsets.retention.minutes=10080
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.flush.interval.messages=20000
log.flush.interval.ms=10000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=60000
zookeeper.connect=prod-kafka-1:2181,prod-kafka-2:2181,prod-kafka-3:2181
zookeeper.connection.timeout.ms=6000
confluent.support.metrics.enable=false
confluent.support.customer.id=anonymous
group.initial.rebalance.delay.ms=3000

And here is the zookeeper.properties file:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181
server.1=prod-kafka-1:2888:3888
server.2=prod-kafka-2:2888:3888
server.3=prod-kafka-3:2888:3888
autopurge.purgeInterval=12
autopurge.snapRetainCount=6

Edit 2 Upgrading to Kafka-2.0.0 didn't seem to solve the problem.

It might be that my incoming rate is too high and that I need to throttle the producers when I know that my crashed server is about to recover? Does that sound right…?

Edit 3 setting auto.leader.rebalance.enable=false solved the problem, but now I have to manually rebalance. However, manually rebalancing when all partitions are caught up doesn't seem to pose any issues.

1
You probably hit this issues.apache.org/jira/browse/KAFKA-2729 under some condition. Might need to try a rolling restart of ZookeepersOneCricketeer
@cricket_007 I have edited with the properties files. Also, I don't think that KAFKA issue is the right one - all my replicas end up in sync, it's just they also end up truncated to offset 0. It feels more like it might be issues.apache.org/jira/browse/KAFKA-6857 ? But, that ticket makes it seem like an esoteric situation. This happens every single time I restart a broker.Ben XO
Are you running brokers in containers? Is /var/lib/kafka/data mounted under some temporary file system that's cleared when you reboot?OneCricketeer
By the way, a production grade deployment would typically separate the zookeepers from the brokersOneCricketeer
Not in containers, and no tempfs. Yes, have considered separating the zookeepers - but not sure what difference it would make. So far this is failing the simplest test - a clean shutdown of the broker.Ben XO

1 Answers

1
votes

There has been some progress since the question was asked. As indicated in the comments the jira Kafka-7447 was created to track this issue. Though that issue specifically did not get closed yet, it was mentioned by several people that the problem was related to Kafka-8896 and that has actually been resolved now. (And those encountering the original issue, no longer have the problem after working with a version that included this improvement.)

As such, the issue is fixed, and using a version of Kafka 2.2.2 and above (or any version with that patch) should ensure you do not run into this problem.