23
votes

In Samza and Kafka Streams, data stream processing is performed in a sequence/graph (called "dataflow graph" in Samza and "topology" in Kafka Streams) of processing steps (called "job" in Samza" and "processor" in Kafka Streams). I will refer to these two terms as workflow and worker in the remainder of this question.

Let's assume that we have a very simple workflow, consisting of a worker A that consumes sensor measurements and filters all values below 50 followed by a worker B that receives the remaining measurments and filters all values above 80.

Input (Kakfa topic X) --> (Worker A) --> (Worker B) --> Output (Kafka topic Y)

If I have understood

correctly, both Samza and Kafka Streams use the topic partitioning concept for replicating the workflow/workers and thus parallelizing the processing for scalability purposes.

But:

  • Samza replicates each worker (i.e., job) separately to multiple tasks (one for each partition in the input stream). That is, a task is a replica of a worker of the workflow.

  • Kafka Streams replicates the whole workflow (i.e., topology) at once to multiple tasks (one for each partition in the input stream). That is, a task is a replica of the whole workflow.

This brings me to my questions:

  1. Assume that there is only one partition: Is it correct, that it is not possible to deploy worker (A) and (B) on two different machines in Kafka Streams while this is possible in Samza? (Or in other words: Is it impossible in Kafka Streams to split a single task (i.e., topology replica) to two machines no matter if there are multiple partitions or not.)

  2. How do two subsequent processors in a Kafka Streams topology (in the same task) communicate? (I know that in Samza all communication between two subsequent workers (i.e., jobs) is done with Kafka topics but since one has to "mark" in Kafka Streams explicitly in the code which streams have to be published as Kafka topics this can't be the case here.)

  3. Is it correct that Samza publishes also all intermediate streams automatically as Kafka topics (and thus makes them available to potential clients) while Kafka Streams only publishes those intermediate and final streams that one marks explicitly (with addSink in the low-level API and to or through in DSL)?

(I'm aware of the fact, that Samza can use also other message queues than Kafka but this is not really relevant for my questions.)

1
I have taken a look at the code and figured out that processors in a Kafka Streams topology "communicate" by recursive calls of the process(...) method. Hence, there is no buffering or messaging between two subsequent processors in a topology. The first processor just calls the process(...)method of the subsequent processor(s) and so on (see forward(...) in ProcessorContextImpl). However, this is only true if my assumption in question 1 is correct. - Lukas Probst

1 Answers

11
votes

First of all, in both Samza and Kafka Streams, you can choose to have an intermediate topic between these two tasks (processors) or not, i.e. the topology can be either:

Input (Kakfa topic X) --> (Worker A) --> (Worker B) --> Output (Kafka topic Y)

or:

Input (Kakfa topic X) --> (Worker A) --> Intermediate (Kafka topic Z) -->(Worker B) --> Output (Kafka topic Y)

In either Samza or Kafka Streams, in the former case you will have to deploy Worker A and B together while in the latter case, you cannot deploy Worker A or B together as in either framework tasks only communicate through intermediate topics, and there is no TCP-based communication channels.

In Samza, for the former case you need to code your two filters as in one task, and for the latter case you need to specify the input and output topic for each of the tasks, e.g. for Worker A input is X and output is Z, for Work B input is Z and output is Y, and you can start / stop the deployed workers independently.

In Kafka Streams, for the former case you can just "concatenate" these processors like

stream1.filter(..).filter(..)

and as a result like Lucas mentioned each result from the first filter will be immediately passed to the second filter (you can think of each input record from topic X traverse the topology in the depth-first ordering, and there is no buffering between any directly connected processors);

And for the latter case you can indicate that the intermediate stream to be "materialized" in another topic, i.e.:

stream1.filter(..).through("topicZ").filter(..)

and each result of the first filter will be sent to the topic Z, which will then be pipelined to the second filter processor. In this case these two filters can potentially be deployed on different hosts or different threads within the same host.