3
votes

While using Processor API of Kafka Streams, I use something like this:

context.forward(key,value)
context.commit()

Actually, what I'm doing here is sending forward a state from state store to sink every minute (using context.schedule() in init() method). What I don't understand here is:

[Key,Value] pair I'm sending forward and then doing commit() is taken from state store. It is aggregated according to my specific logic from many not sequential input [key,value] pairs. Each such output [key,value] pair is aggregation of few not ordered [key,value] pairs from input (kafka topic). So, I don't understand how Kafka cluster and Kafka Streams lib can know the correlation between the original input [key,value] pairs and the eventual output [key,value] that is being sent out. How it can be wrapped by transaction (fail-safe), if Kafka doesn't know the connection between input pairs and output pair. And what is actually being committed when I do context.commit()?

Thanks!

1

1 Answers

2
votes

To explain all this in details goes beyond what I can write here in an answer.

Basically the current input topic offsets and all writes to Kafka topics are done atomically if a transaction is committed. This implies, that all pending writes are flushed before the commit is done.

Transactions don't need to know about your actual business logic. They just "synchronize" the progress tracking on the input topics with writes to output topics.

I would recommend to read corresponding blog posts and watch talks about exactly-once in Kafka to get more details:

Btw: This is a question about manual commits in Streams API. You should consider this: How to commit manually with Kafka Stream?