I am creating consumers (a consumer group with single consumer in it) :
Properties properties = new Properties();
properties.put("zookeeper.connect","localhost:2181");
properties.put("auto.offset.reset", "largest");
properties.put("group.id", groupId);
properties.put("auto.commit.enable", "true");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
consumerMap.entrySet().stream().forEach(
streams -> {
streams.getValue().stream().forEach(
stream -> {
KafkaBasicConsumer customConsumer = new KafkaBasicConsumer();
try {
Future<?> consumerFuture = kafkaConsumerExecutor.submit(customConsumer);
kafkaConsumersFuture.put(groupId, consumerFuture);
} catch (Exception e) {
logger.error("---- Got error : "+ e.getMessage());
logger.error("Exception : ", e);
}
}
);
}
);
I have subscribed 2 consumers for the same topic. I am unsubscribing the consumer by storing its future object and then invoking consumerFuture.cancel(Boolean.TRUE);
Now I subscribe the same consumer again with above code and it gets successfully registered. However, when the publisher now publishes the newly subscribed consumer is not getting messages whereas the other consumer which was registered is getting messages
I am also checking offsets of consumers, they are getting updated when producer publishes but consumers are not getting messages. Before producing :
Group Topic Pid Offset logSize Lag
A T1 0 94 94 1
Group Topic Pid Offset logSize Lag
B T1 0 94 94 1
After producing :
Group Topic Pid Offset logSize Lag
A T1 0 95 97 2
Group Topic Pid Offset logSize Lag
B T1 0 94 97 2
I am not able to figure out that if this an issue from producer side (partitions not enough) or if I have created consumer in an incorrect way Also, I am not able to figure out what is log and lag column means in this.
Let me know if anyone can help or need more details.