2
votes

I want to know if the order in which a message is processed by a stream topology is specified.

Example:

        // read input messages

        KStream<String, String> inputMessages = builder.stream("demo_input_topic_1");
        inputMessages = inputMessages.peek((k, v) -> System.out.println("TECHN. NEW MESSAGE: key: " + k + ", value: " + v));

        // check if message was already processed

        KTable<String, Long> alreadyProcessedMessages = inputMessages.groupByKey().count();
        KStream<String, String> newMessages =
                inputMessages.leftJoin(alreadyProcessedMessages, (streamValue, tableValue) -> getMessageValueOrNullIfKnownMessage(streamValue, tableValue));
        KStream<String, String> filteredNewMessages =
                newMessages.filter((key, val) -> val != null).peek((k, v) -> System.out.println("FUNC. NEW MESSAGE: key: " + k + ", value: " + v));

        // process the message

        filteredNewMessages.map((key, value) -> KeyValue.pair(key, "processed message: " + value))
                .peek((k, v) -> System.out.println("PROCESSED MESSAGE: key: " + k + ", value: " + v)).to("demo_output_topic_1");

With getMessageValueOrNullIfKnownMessage(...):

    private static String getMessageValueOrNullIfKnownMessage(String newMessageValue, Long messageCounter) {
        if (messageCounter > 1) {
            return null;
        }

        return newMessageValue;
    }

So there is only one input and one output topic in the example.

The input topic gets counted (thus a local state is created) in alreadyProcessedMessages. Also, the input topic gets joined with the counting table alreadyProcessedMessages and the result of the join is the stream newMessages (the value of the messages in this stream are null if the message count is > 1, otherwise its the original value of the message).

Then, the messages of newMessages get filtered (the null values are filtered out) and the result is written to an output topic.

So what this minimal stream does: It writes all messages from the input topic to the output topic which have a new key (a key that has not been processed before).

In tests that stream works. But I think that is not guaranteed. It only works, because a message is processed first by the counting node before it gets joined.

But is there a guarantee for that order?

As far as I can see in all the documentation, there is no guarantee for this processing order. So if a new message arrive, this could also happen:

  • The message is processed by the "join node".
  • The message is processed by the "counting node".

This would produce a different result of course (so in this case, if a message with the same key comes in the second time it would still be joined with the original value, since it has not been counted yet).

So is the order of processing specified somewhere?

I know that in new versions of Kafka, a KStream-KTable join is done based on the timestamps of the messages in the input partitions. But this does not help here, because the topology uses the same input partition (because its the same message).

Thank you

2

2 Answers

1
votes

There is no guarantee. Even if in the current implementation, a List of child nodes is used: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L203-L206 -- However, it's not guaranteed that the child nodes are appended to this list in the same order as they are specified in the DSL (because there is a translation layer in between that may add the nodes in a different ordered). Also, the implementation may change at any point in time.

The only workaround (that is rather expensive) I can think of that might work is, to send the stream-side data in a repartiton topic:

KStream<String, String> newMessages =
   inputMessages.through(...) // note: as of 2.6.0 release, you could use `repartition()` instead of `through()`
                .leftJoin(alreadyProcessedMessages, ...);

This way, the KTable will be updated before the join is executed, as the record would need to be read back first. However, as you don't have any guarantee when the record is read back, there might be multiple updates to the table before the join is done what might leave you in a similar situation as before. (Also, its somewhat expensive to reroute the data through an additional topic.)

Using the Processor API, you would have move control, as you can call context.forward(..., To.child(...)). However, for this case you would also need to implement the aggregation and join manually:

KStream routing = inputMessages.transform(...);
routing.groupByKey(...);
routing.leftJoin(...);

For this case, you get repartition topics after transform() that you want to avoid:

KStream routing = inputMessages.transform(...);
routing.transform(...); // implement the aggregation
routing.transform(...); // implement the join

A consecutive transform() would not trigger an auto-repartitioning.

1
votes

This is only a partial answer that narrows down the open question:

In (Confluent's Stream Architecture overview) it is stated that a "depth-first processing strategy" is used to traverse the topology. There is no mention of syncing at nodes that can be reached by same input on multiple paths. (However, at the level of detail of 1 it would be stretch to rule it out based on that.)

Regarding the order in which the DFS travesal takes branches, i did not find a clear statement. However in this Confluent documenation on namings within the topology the "operator's order in the topology" is shown by some examples. One could now assume that this order. which seems given by the order of the DSL operators in source code, also is the execution order. That would provide the guarantee you were asking for. However i could not confirm that assumption by any other source.

That leaves two questions that could be answered by finding the relevant source code in the PAPI implementation.

  1. Is it really just plain DFS traversal without sync points?
  2. Is the branching order in DFS really the operator order as defined in 2? If not, what is it then?