1
votes

I am new to Kafka. Tried to implement consumer and producer classes to send and receive messages. Need to configure bootstrap.servers for both classes which is a list of broker's ip and port separated by ,. For example,

producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");

Since the application will be running on the master node of a cluster, it should be able to retrieve the broker information from ZooKeeper just like the answer to Kafka: Get broker host from ZooKeeper.

public static void main(String[] args) throws Exception {
    ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
    List<String> ids = zk.getChildren("/brokers/ids", false);
    for (String id : ids) {
        String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
        System.out.println(id + ": " + brokerInfo);
    }
}

However this brokerInfo is in Json format which looks like this: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}

In this same post, another one suggested the following way of getting connection string for each broker and join them together with comma.

for (String id : ids) {
        String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
        Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
        if (broker != null) {
            brokerList.add(broker.connectionString());
        }
    }

If this Broker class is from org.apache.kafka.common.requests.UpdateMetadataRequest.Broker, it does not have methods createBroker and connectionString.

Found another similar post Getting the list of Brokers Dynamically. But it did not say how to get the attribute from broker info such as host and port. I can probably write a parser for the json like string to extract them, but I suspect there is more Kafka native way to do that. Any suggestions?

EDIT: I realized the Broker class is from kafka.cluster.Broker. Still it does not have method connectionstring().

1
A general note regarding this approach and whether it's future-proof: The latest Kafka producer and consumer clients do not talk to ZooKeeper anymore. Since this change most users have locked down access to ZooKeeper for anything but the Kafka brokers (servers) for improved security. In such a scenario you're client applications would not be able to extract broker-related information from ZooKeeper because they are not allowed to talk to ZooKeeper in the first place.Michael G. Noll
@miguno That's right. I did not have to set zookeeper connection in either producer or consumer. So what would you suggest I should do to get broker ips and ports. I could get the url from the request (which is the ip of master/bootstrap) and make a GET call to service/kafka/v1/connection endpoint. Just wondering if there is another way to do it without calling api.ddd

1 Answers

1
votes

You could use ZkUtils to retrieve all the broker information in the cluster, as show below:

ZkUtils zk = ZkUtils.apply("zkHost:2181", 6000, 6000, true);
List<Broker> brokers = JavaConversions.seqAsJavaList(zk.getAllBrokersInCluster());
for (Broker broker : brokers) {  
    //assuming you do not enable security    
    System.out.println(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host());
}
zk.close();