Kafka consumer not receiving messages produced before the consumer gets started.
public class MyKafkaConsumer {
private final KafkaConsumer<String, String> consumer;
private final String TOPIC="javaapp";
private final String BOOTSTRAP_SERVERS="localhost:9092";
private int receivedCounter=0;
private ExecutorService executorService=Executors.newFixedThreadPool(1);
private BlockingQueue<ConsumerRecords<String, String>> queue=new LinkedBlockingQueue<>(500000);
private MyKafkaConsumer() {
final Properties props=new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaGroup6");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumer=new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
}
public static void main(String[] args) throws InterruptedException {
MyKafkaConsumer perfKafkaConsumer=new MyKafkaConsumer();
perfKafkaConsumer.consumeMessage();
perfKafkaConsumer.runConsumer();
}
private void runConsumer() throws InterruptedException {
consumer.poll(Duration.ofMillis(1000));
while (true) {
final ConsumerRecords<String, String> consumerRecords=consumer.poll(Duration.ofMillis(10000));
if (!consumerRecords.isEmpty()) {
System.out.println("Adding result in queue " + queue.size());
queue.put(consumerRecords);
}
consumer.commitAsync();
}
}
private void consumeMessage() {
System.out.println("Consumer starts at " + Instant.now());
executorService.submit(() -> {
while (true) {
ConsumerRecords<String, String> poll=queue.take();
poll.forEach(record -> {
System.out.println("Received " + ++receivedCounter + " time " + Instant.now(Clock.systemUTC()));
});
}
});
}
}
ConsumerRecords are always empty
I checked the offset using Kafka tool
I have also tried with a different group name, it's not working. Same issue i.e. poll returns empty records
Although, if I start my consumer before than producer than its receiving the messages. (Kafka-client ver 2.4.1)