While using Processor API of Kafka Streams, I use something like this:
context.forward(key,value)
context.commit()
Actually, what I'm doing here is sending forward a state from state store to sink every minute (using context.schedule() in init() method). What I don't understand here is:
[Key,Value] pair I'm sending forward and then doing commit() is taken from state store. It is aggregated according to my specific logic from many not sequential input [key,value] pairs. Each such output [key,value] pair is aggregation of few not ordered [key,value] pairs from input (kafka topic). So, I don't understand how Kafka cluster and Kafka Streams lib can know the correlation between the original input [key,value] pairs and the eventual output [key,value] that is being sent out. How it can be wrapped by transaction (fail-safe), if Kafka doesn't know the connection between input pairs and output pair. And what is actually being committed when I do context.commit()?
Thanks!