0
votes

I have a simple Kafka 2.4.1 (Confluent 5.4.1) setup running locally in Docker. And I use a test producer and a test consumer written in Java. The code is available in GitHub.

The unit tests do:

  • a producer produces one message to a single-partition-topic
  • a consumer subscribes to the topic and polls Kafka for the available messages

The problem is: the 1st run of the consumer will skip the already produced messages available in the topic. The real problem is that those missed messages are lost (from the consumer point of view: the offset is moved to the latest in the topic and the lag is 0 <- this is all visible in Kafka Tool)

The results after the first run are:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:26:51 CET 2020
Offset: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.686 sec
Running com.example.TestKafkaConsumer
Record count: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.561 sec

Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

The 2nd run of the tests give:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:28:08 CET 2020
Offset: 1
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.538 sec
Running com.example.TestKafkaConsumer
Record count: 1
offset = 1, key = static-key, value = this is the string message at Thu Mar 26 10:28:08 CET 2020
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.138 sec

Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

I tried tinkering in with different variations and always the result are the same:

  • wait for Kafka to "warm up" before producing the first message
  • wait some time between producing and consuming
  • producing several messages before the first time consuming
  • produced and consumed without inspection from Kafka Tool (to avoid any 3rd party unknown interference)

Sometimes I observed ALSO that a 2nd run of the consumer still missed the produced events.

1

1 Answers

2
votes

Looking at the code on your GitHub repository it looks like you are not setting the Consumer configuration auto.offset.reset. According to the documentation this setting defaults to latest. That means that if the Consumer Group is unknown to the Broker for your test topic it will only wait for new incoming messages. Therefore, the messages that was written by your Producer Test beforehand could not be consumed by the TestConsumer.

This documentation gives a nice explanation of the Consumer Group concept within Kafka.