0
votes

Kafka consumer not receiving messages produced before the consumer gets started.

 public class MyKafkaConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final String TOPIC="javaapp";
    private final String BOOTSTRAP_SERVERS="localhost:9092";
    private int receivedCounter=0;
    private ExecutorService executorService=Executors.newFixedThreadPool(1);

    private BlockingQueue<ConsumerRecords<String, String>> queue=new LinkedBlockingQueue<>(500000);
    private MyKafkaConsumer() {
        final Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaGroup6");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumer=new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
    }

    public static void main(String[] args) throws InterruptedException {
        MyKafkaConsumer perfKafkaConsumer=new MyKafkaConsumer();
        perfKafkaConsumer.consumeMessage();
        perfKafkaConsumer.runConsumer();
    }

    private void runConsumer() throws InterruptedException {
        consumer.poll(Duration.ofMillis(1000));
        while (true) {
            final ConsumerRecords<String, String> consumerRecords=consumer.poll(Duration.ofMillis(10000));
            if (!consumerRecords.isEmpty()) {
                System.out.println("Adding result in queue " + queue.size());
                queue.put(consumerRecords);
            }
            consumer.commitAsync();

        }
    }

    private void consumeMessage() {
        System.out.println("Consumer starts at " + Instant.now());
        executorService.submit(() -> {
            while (true) {
                ConsumerRecords<String, String> poll=queue.take();
                poll.forEach(record -> {
                    System.out.println("Received " + ++receivedCounter + " time " + Instant.now(Clock.systemUTC()));
                });

            }
        });
    }
}

ConsumerRecords are always empty

I checked the offset using Kafka tool enter image description here

I have also tried with a different group name, it's not working. Same issue i.e. poll returns empty records enter image description here

Although, if I start my consumer before than producer than its receiving the messages. (Kafka-client ver 2.4.1)

1
This isn't the answer but I notice you are processing messages asynchronously but committing without knowing whether they have been processed or not. Is this intentional? It will almost certainly lead to lost messages in the event of a consumer restart. To increase throughput of consumption it is better to just add more consumers to the group, but process messages in the thread that calls poll. Note that while there are messages on the topic poll will not block because the heartbeat thread is prefetching while you process in the main threadChris
Note that trying a different group name will certainly lead to the behaviour you describe as the new group has no committed offsets and default value of auto.offset.reset is latestChris

1 Answers

0
votes

The auto.offset.reset consumer setting controls where a new consumer group will begin consuming from a topic. By default it is set to 'latest' which will set the consumer groups offset to the latest offset. You want to set this to 'earliest' if all consumer groups should start at the earliest offset in the topic.