0
votes

The docker-compose.yml file referenced here allows for easy setup of ZooKeeper and Kafka in 3 machines.

Let's have the IP addresses of the 3 machines as 1.1.1.1, 1.1.1.2, and 1.1.1.3.

  • Machine 1: zk1, kafka1
  • Machine 2: zk2, kafka2
  • Machine 3: zk3, kafka3

All the ZooKeeper IP addresses are specified for each zk instance. That is:

  • zk1: zookeepers=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
  • zk2: zookeepers=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
  • zk3: zookeepers=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]

We do the same for each Kafka instance:

  • kafka1: zookeepers (KAFKA_ZOOKEEPER_CONNECT)=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
  • kafka2: zookeepers (KAFKA_ZOOKEEPER_CONNECT)=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]
  • kafka3: zookeepers (KAFKA_ZOOKEEPER_CONNECT)=[zk1 (1.1.1.1), zk2 (1.1.1.2), zk3 (1.1.1.3)]

This works well. I can send (produce) to one Kafka broker and get it from the other broker using kakfa-python:

from kafka import KafkaProducer, KafkaConsumer

p = KafkaProducer(bootstrap_servers='1.1.1.2:9092')  # Send to one broker
p.send('topic1', b'1')

c = KafkaConsumer('topic1', bootstrap_servers='1.1.1.3:9092', auto_offset_reset='earliest')  # Retrieve from another broker
next(c)  # This retrieves it properly

Now, the questions are:

  • (1) How many ZooKeeper nodes do I need to specify for each ZooKeeper instance?
  • (2) How many ZooKeeper nodes do I need to specify for each Kafka instance (broker)?
  • (3) How many Kafka brokers do I need to specify for Kafka producers (bootstrap_servers)?
  • (4) How many Kafka brokers do I need to specify for Kafka consumers (bootstrap_servers)?

I've done some experiments and read a bit as well, but it helps to have someone verify (esp. since some of the answers I've read are still from earlier Kafka versions).

Question 1

Each zk instance must specify all the other zk instances. This makes sense, as this defines the whole zk cluster. Though, I am not sure if it is possible for zk to do node discovery, something like:

  • zk1: zookeepers=[zk1, zk2]
  • zk2: zookeepers=[zk2, zk3]
  • zk3: zookeepers=[zk3, zk1]

in which it discovers all the other nodes on its own.

Question 2-4

I've tried setting kafka1 to only use zk1.

Let's assume that we are talking about topic1, which has one partition, one replication factor, and is on machine 1.

Observations:

  • Assuming zk1 is up, I can produce to a kafka2 and consume from kafka3.
  • If zk1 is down, I can produce to kafka1 successfully.
  • If zk1 is down, I can also produce to kafka2 or kafka3 successfully.
  • If zk1 is down, I can still consume from kafka1.
  • If zk1 is down, consumption from kafka2 or kafka3 blocks until zk1 is back up. The messages that were sent while zk1 was down, but are marked by kafka2 or kafka3 as successfully sent can be retrieved here.

Based on these observations:

  • Supposedly you only need to specify at least one broker in bootstrap_servers, but it is still probably better to specify all of them.
  • While ZooKeeper is down: we can produce, but consumers block because it needs ZooKeeper to track consumer offsets. This is explained here, although this is still from 2015, when there were separate consumer APIs. I assume that the explanation here still holds though.
1
Regarding "Assuming zk1 is up, I can produce to a kafka2 and consume from kafka3"... how do you verify that your clients produce and consume to/from 2 and 3? Because that should not be possible, since the topic only exists on 1. When you specify 2 or 3 as bootstrap, the client receives metadata about all 3 servers, topics, partitions, etc. When the client interacts with the topic, it knows it needs to communicate with 1, not with 2 or 3.Christoph

1 Answers

0
votes

My thoughts on your questions:

(1) How many ZooKeeper nodes do I need to specify for each ZooKeeper instance?

Best is all of them, especially if you only have 3 nodes in the ensemble. If you only specify two, and then perform a rolling restart, each node will lose the connection to its only other known peer at some point in time during the restart, and with that the (indirect) connection to the ensemble (well, the other remaining, unknown node).

(2) How many ZooKeeper nodes do I need to specify for each Kafka instance (broker)?

Can't say. Again, ideally all of them. Specifying only one most likely works, until this node goes down, i.e. you're running a cluster with low fault tolerance.

(3) How many Kafka brokers do I need to specify for Kafka producers (bootstrap_servers)?

(4) How many Kafka brokers do I need to specify for Kafka consumers (bootstrap_servers)?

In theory just one because during the initial handshake, the Kafka node provides the connecting client with information about all the other Kafka nodes. That's why it is important to correctly specify the advertised listeners in your cluster (these are all the endpoints under which your nodes can be reached from the "outside" by clients). Once the handshake is complete, you could turn off that server, and assuming that your topics have >1 replication, your cluster should still be fully functional.

However, if clients only know the address of one bootstrap server, if that server goes down they will not be able to initiate new connections to the cluster, i.e. reduced fault tolerance.