The docker-compose.yml
file referenced here allows for easy setup of ZooKeeper and Kafka in 3 machines.
Let's have the IP addresses of the 3 machines as 1.1.1.1
, 1.1.1.2
, and 1.1.1.3
.
- Machine 1: zk1, kafka1
- Machine 2: zk2, kafka2
- Machine 3: zk3, kafka3
All the ZooKeeper IP addresses are specified for each zk instance. That is:
- zk1: zookeepers=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
- zk2: zookeepers=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
- zk3: zookeepers=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
We do the same for each Kafka instance:
- kafka1: zookeepers (
KAFKA_ZOOKEEPER_CONNECT
)=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)] - kafka2: zookeepers (
KAFKA_ZOOKEEPER_CONNECT
)=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)] - kafka3: zookeepers (
KAFKA_ZOOKEEPER_CONNECT
)=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
This works well. I can send (produce) to one Kafka broker and get it from the other broker using kakfa-python
:
from kafka import KafkaProducer, KafkaConsumer
p = KafkaProducer(bootstrap_servers='1.1.1.2:9092') # Send to one broker
p.send('topic1', b'1')
c = KafkaConsumer('topic1', bootstrap_servers='1.1.1.3:9092', auto_offset_reset='earliest') # Retrieve from another broker
next(c) # This retrieves it properly
Now, the questions are:
- (1) How many ZooKeeper nodes do I need to specify for each ZooKeeper instance?
- (2) How many ZooKeeper nodes do I need to specify for each Kafka instance (broker)?
- (3) How many Kafka brokers do I need to specify for Kafka producers (bootstrap_servers)?
- (4) How many Kafka brokers do I need to specify for Kafka consumers (bootstrap_servers)?
I've done some experiments and read a bit as well, but it helps to have someone verify (esp. since some of the answers I've read are still from earlier Kafka versions).
Question 1
Each zk instance must specify all the other zk instances. This makes sense, as this defines the whole zk cluster. Though, I am not sure if it is possible for zk to do node discovery, something like:
- zk1: zookeepers=[zk1, zk2]
- zk2: zookeepers=[zk2, zk3]
- zk3: zookeepers=[zk3, zk1]
in which it discovers all the other nodes on its own.
Question 2-4
I've tried setting kafka1
to only use zk1
.
Let's assume that we are talking about topic1
, which has one partition, one replication factor, and is on machine 1.
Observations:
- Assuming
zk1
is up, I can produce to akafka2
and consume fromkafka3
. - If
zk1
is down, I can produce tokafka1
successfully. - If
zk1
is down, I can also produce tokafka2
orkafka3
successfully. - If
zk1
is down, I can still consume fromkafka1
. - If
zk1
is down, consumption fromkafka2
orkafka3
blocks until zk1 is back up. The messages that were sent whilezk1
was down, but are marked bykafka2
orkafka3
as successfully sent can be retrieved here.
Based on these observations:
- Supposedly you only need to specify at least one broker in
bootstrap_servers
, but it is still probably better to specify all of them. - While ZooKeeper is down: we can produce, but consumers block because it needs ZooKeeper to track consumer offsets. This is explained here, although this is still from 2015, when there were separate consumer APIs. I assume that the explanation here still holds though.