I am trying to have x number of consumers access a specified topic in kafka but not consume the same messages. I want for example...
Consumer 1 pick up offset 1 Consumer 2 pick up offset 2 Consumer 1 pick up offset 3 Consumer 2 pick up offset 4
I want kafka to act as a queue for those two consumers. I noticed the group.id configuration and I assumed that you could use the same group and it would handle it accordingly but it does not seem to work the way I thought it would.
Here is the code I am using...
public void init(){
Properties props = new Properties();
props.put("bootstrap.servers", kafkaUrl);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "true");
props.put("group.id", "group1");
props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress());
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("event1", "event2"));
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS);
}
public void pollTopics() {
try {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore);
if(processor != null) {
kafkaThreadPool.execute(processor);
}
}
}catch (Exception e){
LOG.error("Polling exception occurred", e);
}
}
I want to be able to run this code in a cluster environment and have kafka be the queue. I want it to pull the message and go to the next offset at the same time, then the next kafka poll will grab the next offset. Is this possible? And if so what am I doing wrong?