1
votes

I'm starting a cluster of kafka brokers using Docker (5 brokers for example, one broker per container). Kafka version 2.12-0.11.0.0, Zookeeper 3.4.10.

The scenario:

  • Starting 1st broker with config below

zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=1000000
server.1=0.0.0.0:2888:3888

server.properties

broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker1_IP:broker1_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0

producer.properties

bootstrap.servers=localhost:9092
compression.type=none

consumer.properties

zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.id=test-consumer-group
  • Zookeeper is started in standalone mode, then starts kafka

  • Creating topic

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-test-topic1

  • Sending message

echo "test_kafka1" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-test-topic1

  • Checking message

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-test-topic1 --max-messages 1

Message is recieved

  • Describe the topic

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test-topic1 Topic:my-test-topic1 PartitionCount:1 ReplicationFactor:1 Configs: Topic: my-test-topic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1

  • Starting rest 4 brokers

zoo.cfg on each broker from 1st to 5th (only 0.0.0.0:2888:3888 position differs)

tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=1000000
server.1=0.0.0.0:2888:3888
server.2=broker2_IP:broker2_2888:broker2_3888
server.3=broker3_IP:broker3_2888:broker3_3888
server.4=broker4_IP:broker4_2888:broker4_3888
server.5=broker5_IP:broker5_2888:broker5_3888

server.properties on each broker from 1st to 5th (broker.id are unique, broker_IP:broker_PORT differs for ech broker)

broker.id=N
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker_IP:broker_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0

producer.properties on each broker from 1st to 5th

bootstrap.servers=localhost:9092
compression.type=none

consumer.properties on each broker from 1st to 5th

zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.id=test-consumer-group
  • Restarting zookeeper on each broker to take effect of zoo.cfg

  • Zookeepers gather into cluster

  • Topic moved to broker 5

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test-topic1 Topic:my-test-topic1 PartitionCount:1 ReplicationFactor:1 Configs: Topic: my-test-topic1 Partition: 0 Leader: 5 Replicas: 5 Isr: 5

Is it normal behavior? Or should it stay on broker 1?

  • Checking message on each broker

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-test-topic1 --max-messages 1

Message is lost (message is not lost when topic stays on broker 1, so it is floating situation)

2

2 Answers

0
votes

Have you tried upping the ticktime to 6000? Based on Hadoop's settings they use this by default, stating the 2000 millisecond setting is too low. I would think the same applies here. I am working a very similar kafka issue right now.

0
votes

In Kafka documentation, both config description and config example recommend to specify all zookeeper servers in broker zookeeper.connect. Also in production it is expected that you run a separate Zookeeper cluster and a separate Kafka cluster, not co-run Kafka and ZK in one docker container.

I imagine something like this could be happening:

  • due to some specifics in how you restart docker containers, ZKs 2-5 don't know that Kafka 1 has created a znode in ZK 1 describing your test topic to have "Replicas: 1, ISR: 1", or don't agree to use ZK 1 version since there is no quorum
  • some subset of containers 2-5 starts and 3 out of 5 ZKs there form a quorum without waiting for ZK 1
  • something (consumer or command-line tool or broker auto-creation) tries to use the topic, and since ZK quorum agrees that it does not exist yet, creates it and assigns the replica to one of currently available brokers (5 in this case).
  • container 1 starts, ZK 1 has to give up its version of topic znode in favor of quorum, Kafka has to give up its replica in favor of currently described.

I'm not sure what is the correct approach to move from a single-node Zookeeper to a replicated setup, and cannot find it in the docs. Perhaps you have to initially assign more weight to you first ZK so you guarantee it becoming a leader and forcing its topic configuration on other ZK nodes.

Have you created a JIRA issue? Got any response from the developers?