I have an consume-transform-produce app with exactly-once scemantics in Kafka. The (transactional) produce phase produces new messages on the same topic that is then consumed (transactionally=read_committed). There is only one thread doing this and it is ensured that the consumer poll happens after the producer's transaction commit. Right now I only have one poll statement in place per consume-transform-produce-round.
Test case
When I run my testcase, sometimes there may be messages that some other producer sent (legibly) before my producer's transaction commits. Then I experience the following:
My single poll statement returns only this foreign message, but not my just-a-moment-ago-produced message, although the transaction of the last round commited successfully.
Questions
- Am I something missing so that my transaction result from the last round is not visible to the consumer of the next round?
- Do I have to issue multiple polls until one poll returns 0 records and that tells me that all messages on the server in that partition are read?
- Can Kafka maybe not garantuee that all messages currently on the partition are read? Maybe there is not something like "I am finished with reading this partition for now"?
Configuration
Transactional consumer
final Map consumerConfig = new LinkedHashMap<>(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, ID); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Transactional producer
final Map producerConfig = new LinkedHashMap<>(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER); producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ID); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
My poll timeout is 2sec
- My understanding is that transactional producers are automatically idempotent and acks=all
- My testcase is with only one broker and one replication. But of course I intend to use more in production
- I use Kafka 2.0
- My topic only has one partition
- My thread has its own consumer group and is assigned to this single partition