Finally, I figured it out. Since reading from kafka and writing offset back to kafka are controlled in a different way.
If you are running your topology on a storm cluster irrespective of single or multi node make sure you have set following in your storm.yaml file
storm.zookeeper.servers
and
storm.zookeeper.port
properties apart from zkHosts and zkRoot and consumer group id.
Or best practice would be to override these properties in your topology by setting correct values while creating KafkaSpout like -
BrokerHosts zkHosts = new ZkHosts(prop.getProperty("kafka.zkHosts"));
String topicName = prop.getProperty("kafka.topic");
String zkRoot = prop.getProperty("kafka.zkRoot");
String groupId = prop.getProperty("kafka.group.id");
String kafkaServers = prop.getProperty("kafka.zkServers");
String zkPort = prop.getProperty("kafka.zkPort");
//kafka spout conf
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, zkRoot, groupId);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.zkServers = Arrays.asList(kafkaServers);
kafkaConfig.zkPort = Integer.valueOf(zkPort);
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
Or even you can put these value in Config object. This is better since you might want to store offset info to some other zookeeper cluster vs your topology reads message from a completely different broker
KafkaSpout code snippet for understanding-
@Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
Map stateConf = new HashMap(conf);
List<String> zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if (zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);
_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
} else {
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
}