I am new to Kafka. Tried to implement consumer and producer classes to send and receive messages. Need to configure bootstrap.servers
for both classes which is a list of broker's ip and port separated by ,
. For example,
producerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
Since the application will be running on the master node of a cluster, it should be able to retrieve the broker information from ZooKeeper
just like the answer to Kafka: Get broker host from ZooKeeper.
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
System.out.println(id + ": " + brokerInfo);
}
}
However this brokerInfo is in Json format which looks like this: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
In this same post, another one suggested the following way of getting connection string for each broker and join them together with comma.
for (String id : ids) {
String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
if (broker != null) {
brokerList.add(broker.connectionString());
}
}
If this Broker
class is from org.apache.kafka.common.requests.UpdateMetadataRequest.Broker
, it does not have methods createBroker
and connectionString
.
Found another similar post Getting the list of Brokers Dynamically. But it did not say how to get the attribute from broker info such as host
and port
. I can probably write a parser for the json like string to extract them, but I suspect there is more Kafka native way to do that. Any suggestions?
EDIT: I realized the Broker
class is from kafka.cluster.Broker
. Still it does not have method connectionstring()
.
service/kafka/v1/connection
endpoint. Just wondering if there is another way to do it without calling api. – ddd