1
votes

I am not sure if this a right question to ask in this forum. We were consuming from a Kafka topic by Storm using the Storm KafkaSpout connector. It was working fine till now. Now we are supposed to connect to a new Kafka cluster having upgraded version 0.10.x from the same Storm env which is running on version 0.10.x.

From storm documentation (http://storm.apache.org/releases/1.1.0/storm-kafka-client.html) I can see that storm 1.1.0 is compatible with Kafka 0.10.x onwards supporting the new Kafka consumer API. But in that case I won't be able to run the topology in my end (please correct me if I am wrong).

Is there any work around for this? I have seen that even if the New Kafka Consumer API has removed ZooKeeper dependency but we can still consume message from it using the old Kafka-console-consumer.sh by passing the --zookeeper flag instead of new –bootstrap-server flag (recommended). I run this command from using Kafka 0.9 and able to consume from a topic hosted on Kafka 0.10.x

When we are trying to connect getting the below exception:

    java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/topics/mytopic/partitions
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[stormjar.jar:?]
at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) ~[stormjar.jar:?]

But we are able to connect to the remote ZK server and validated that the path exists:

     ./zkCli.sh -server remoteZKServer:2181

      [zk: remoteZKServer:2181(CONNECTED) 5] ls /brokers/topics/mytopic/partitions
      [3, 2, 1, 0]

As we can see above that it's giving us expected output as the topic has 4 partitions in it.

At this point have the below questions:

1) Is it at all possible to connect to Kafka 0.10.x using Storm version 0.10.x ? Has one tried this ?

2) Even if we are able to consume, do we need to make any code change in order to retrieve the message offset in case of topology shutdown/restart. I am asking this as we will passing the Zk cluster details instead of the brokers info as supported in old KafkaSpout version.

Running out of options here, any pointers would be highly appreciated

UPDATE:
We are able to connect and consume from the remote Kafka topic while running it locally using eclipse. To make sure storm does not uses the in-memory zk we have used the overloaded constructor LocalCluster("zkServer",port), it's working fine and we can see the data coming. This lead us to conclude that version compatibility might not be the issue here.

However still no luck when deployed the topology in cluster. We have verified the connectivity from storm box to zkservers The znode seems fine also ..

At this point really need some pointers here, what could possibly be wrong with this and how do we debug that? Never worked with Kafka 0.10x before so not sure what exactly are we missing.

Really appreciate some help and suggestions

1

1 Answers

1
votes

Storm 0.10x is compatible with Kafka 0.10x . We can still uses the old KafkaSpout that depends on zookeeper based offset storage mechanism.

The connection loss exception was coming as we were trying to reach a remote Kafka cluster that does not allow/accept connection from our end. We need to open specific firewall port so that the connection can be established. It seems that while running topology is cluster mode all the supervisor nodes should be able to talk to the zookeeper, so the firewall should be open for each one of them.