For particular reasons I need to use both - ConsumerGroup
(a.k.a. high-level consumer) and SimpleConsumer
(a.k.a. low-level consumer) to read from Kafka. For ConsumerGroup
I use ZooKeeper-based config and am completely satisfied with it, but SimpleConsumer
requires seed brokers to be instantiated.
I don't want to keep list of both - ZooKeeper and broker hosts. Thus, I'm looking for a way to automatically discover brokers for a particular topic from ZooKeeper.
Because of some indirect information I belief that these data is stored in ZooKeeper under one of the following paths:
/brokers/topics/<topic>/partitions/<partition-id>/state
- /brokers/ids/
However, when I try to read data from these nodes, I'm getting serialization error (I'm using com.101tec.zkclient
for this):
org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 elided Caused by: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 more
I can write and read custom Java objects (e.g. Strings) without any problem, so I believe it's not a problem of a client, but rather tricky encoding. Thus, I want to know:
- If this is the right way to go, how to read these nodes properly?
- If the whole approach is wrong, what is the right one?