Obviously, your producer does not need to connect to broker3
:)
I'll try to explain you what happens when you are producing data to Kafka:
- You spin up some brokers, let's say 3, then create some topic
foo
with 2 partitions, replication factor 2. Quite simple example, yet could be a real case for someone.
- You create a producer with
metadata.broker.list
(or bootstrap.servers
in new producer) configured to these brokers. Worth mentioning, you don't necessarily have to specify all the brokers in your cluster, in fact you can specify only 1 of them and it will still work. I'll explain this in a bit too.
- You send a message to topic
foo
using your producer.
- The producer looks up its local metadata cache to see what brokers are leaders for each partition of topic
foo
and how many partitions does your foo
topic have. As this is the first send to the producer, local cache contains nothing.
- Producer sends a
TopicMetadataRequest
to each broker in metadata.broker.list
sequentially until first successful response. That's why I mentioned 1 broker in that list would work as long as it's alive.
- Returned
TopicMetadataResponse
will contain the information about requested topics, in your case it's foo
and brokers in the cluster. Basically, this response contains the following:
- list of brokers in the cluster, where each broker has an ID, host and port. This list may not contain the entire list of brokers in the cluster, but should contain at least the list of brokers that are responsible for servicing the subject topic.
- list of topic metadata, where each entry has topic name, number of partitions, leader broker ID for each partition and ISR broker IDs for each partition.
- Based on
TopicMetadataResponse
your producer builds up its local cache and now knows exactly that the request for topic foo
partition 0
should go to broker X.
- Based on number of partitions in a topic, producer partitions your message and accumulates it with the knowledge that it should be sent as a part of batch to some broker.
- When the batch is full or
linger.ms
timeout passes, your producer flushes the batch to the broker. By "flushes" I mean "opens a new connection to a broker or reuses an existing one, and sends the ProduceRequest
".
The producer does not need to open unnecessary connections to all brokers, as the topic you are producing to may not be serviced by some brokers, and your cluster could be quite large. Imagine a 1000 broker cluster with lots of topics, but one of topics has just one partition - you only need that one connection, not 1000.
In your particular case I'm not 100% sure why you have 2 open connections to brokers, if you have just a single partition, but I assume one connection was opened during metadata discovery and was cached for reusing, and the second one is the actual broker connection to produce data. However, I might be wrong in this case.
But anyway, there is no need at all to have a connection for the third broker.
Regarding your question about "Should I always have number of brokers = number of partitons?" the answer is most likely no. If you explain what you are trying to achieve, maybe I'll be able to point you to the right direction, but this is too broad to explain in general. I recommend reading this to clarify things.
UPD to answer the question in comment:
Metadata cache is updated in 2 cases:
If producer fails to communicate with broker for any reason - this includes the case when the broker is not reachable at all and when broker responds with an error (like "I'm not leader for this partition anymore, go away")
If no failures happen, the client still refreshes metadata every metadata.max.age.ms
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java#L42-L43) to discover new brokers and partitions itself.