1
votes

I have a simple kafka setup. A producer is producing messages to a single partition with a single topic at a high rate. A single consumer is consuming messages from this partition. During this process, the consumer may pause processing messages several times. The pause can last a couple of minutes. After the producer stops producing messages, all messages queued up will be processed by the consumer. It appears that messages produced by the producer are not being seen immediately by the consumer. I am using kafka 0.10.1.0. What can be happening here? Here is the section of code that consumes the message:

            while (true)
            {
                try
                {
                    ConsumerRecords<String, byte[]> records = consumer.poll(100);
                    for (final ConsumerRecord<String, byte[]> record : records)
                    {
                        serviceThread.submit(() ->
                        {
                            externalConsumer.accept(record);
                        });
                    }
                    consumer.commitAsync();
                } catch (org.apache.kafka.common.errors.WakeupException e)
                {
                }
            }

where consumer is a KafkaConsumer with auto commit disabled, max poll record of 100, and session timeout of 30000. serviceThread is an ExecutorService.

The producer just involves the KafkaProducer.send call to send a ProducerRecord.

All configurations on the broker are left as kafka defaults.

I am also using kafka-consumer-groups.sh to check what is happening when consumer is not consuming the message. But when this happens, the kafka-consumer-groups.sh will be hanging there also, not able to get information back. Sometimes it triggers the consumer re-balance. But not always.

1
Instead of 100ms, did you get any chance to set a larger value for the poll timeout and retry?amethystic
Tried 1000, still the same behavior.Y.Zhou
By "stopping consuming messages", do you mean KafkaConsumer#poll always returns empty?amethystic
I figured out what is happening here. Kafka broker is allowed to accumulate messages before they are written to the disk. By default, the kafka broker can accumulate messages for upto a minute without a limitation on how many messages to accumulate. The consumer is not able to access those messages until after they are being flushed out to the disk. After reducing those values, it gets rid off the consumer pause.Y.Zhou
A similar expression of Client only sees committed messages. guaranteed by Kafka.amethystic

1 Answers

0
votes

For those who can find this helpful. I've encountered this problem (when kafka silently supposedly stops consuming) often enough and every single time it wasn't actually problem with Kafka.

Usually it is some long-running or hanged silent process that keeps Kafka from committing the offset. For example a DB client trying to connect to the DB. If you wait for long enough (e.g. 15 minutes for SQLAlchemy and Postgres), you will see a exception will be printed to the STDOUT, saying something like connection timed out.