I want to know if the order in which a message is processed by a stream topology is specified.
Example:
// read input messages
KStream<String, String> inputMessages = builder.stream("demo_input_topic_1");
inputMessages = inputMessages.peek((k, v) -> System.out.println("TECHN. NEW MESSAGE: key: " + k + ", value: " + v));
// check if message was already processed
KTable<String, Long> alreadyProcessedMessages = inputMessages.groupByKey().count();
KStream<String, String> newMessages =
inputMessages.leftJoin(alreadyProcessedMessages, (streamValue, tableValue) -> getMessageValueOrNullIfKnownMessage(streamValue, tableValue));
KStream<String, String> filteredNewMessages =
newMessages.filter((key, val) -> val != null).peek((k, v) -> System.out.println("FUNC. NEW MESSAGE: key: " + k + ", value: " + v));
// process the message
filteredNewMessages.map((key, value) -> KeyValue.pair(key, "processed message: " + value))
.peek((k, v) -> System.out.println("PROCESSED MESSAGE: key: " + k + ", value: " + v)).to("demo_output_topic_1");
With getMessageValueOrNullIfKnownMessage(...)
:
private static String getMessageValueOrNullIfKnownMessage(String newMessageValue, Long messageCounter) {
if (messageCounter > 1) {
return null;
}
return newMessageValue;
}
So there is only one input and one output topic in the example.
The input topic gets counted (thus a local state is created) in alreadyProcessedMessages
. Also, the input topic gets joined with the counting table alreadyProcessedMessages
and the result of the join is the stream newMessages
(the value of the messages in this stream are null
if the message count is > 1, otherwise its the original value of the message).
Then, the messages of newMessages
get filtered (the null
values are filtered out) and the result is written to an output topic.
So what this minimal stream does: It writes all messages from the input topic to the output topic which have a new key (a key that has not been processed before).
In tests that stream works. But I think that is not guaranteed. It only works, because a message is processed first by the counting node before it gets joined.
But is there a guarantee for that order?
As far as I can see in all the documentation, there is no guarantee for this processing order. So if a new message arrive, this could also happen:
- The message is processed by the "join node".
- The message is processed by the "counting node".
This would produce a different result of course (so in this case, if a message with the same key comes in the second time it would still be joined with the original value, since it has not been counted yet).
So is the order of processing specified somewhere?
I know that in new versions of Kafka, a KStream-KTable join is done based on the timestamps of the messages in the input partitions. But this does not help here, because the topology uses the same input partition (because its the same message).
Thank you