0
votes

I've a complex Kafka Stream application with 2 flows fully stateful in the same stream :

  • it use a Execution topic as source, enhanced the message and republished back to the same Execution topic.
  • it join another topic WorkerTaskResult, add the result to Execution and published back to Execution Topic.

The main goal is to provide a workflow system.

The detailled logic are :

  • an Execution is a list of TaskRun
  • the Execution look at all the current state of all TaskRun and find the next one to execute
  • If any is found, Execution alter their TaskRunsList and add the next one and publish back to Kafka, also it send to another queue the task to be done (WorkerTask)
  • the WorkerTask is proceed outside of the Kafka stream and publish back to another queue (WorkerTaskResult) with a simple Kafka Consumer & Producer
  • the WorkerTaskResult alter current TaskRun in the current Execution and changed the status (mostly RUNNING / SUCCEED / FAILED) and also published back to Execution queue (with Kafka Stream)

As you can see, the Execution (with TaskRun list) is the state are current application.

The stream works well when all the message are sequential (no concurrency, I can only have one alter of TaskRun list at the same time). When the workflow became Parallel (concurrent WorkerTaskResult can be join), it seems that my Execution state is override and produce a kind of roolback.

Example log output:

2020-04-20 08:05:44,830 INFO  reamThread-1 afkaExecutor Stream in with 3264792750: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=CREATED) # >>>>> t1 is created 
  ] 
)
2020-04-20 08:05:44,881 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> worker send running state
2020-04-20 08:05:44,882 INFO  reamThread-1 afkaExecutor Stream out  with 1805535461 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> t1 save the running state
  ] 
)
2020-04-20 08:05:45,047 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS) # >>>>> worker send success
2020-04-20 08:05:45,047 INFO  reamThread-1 afkaExecutor Stream out  with 578845055 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS)
  ] 
)
2020-04-20 08:05:45,153 INFO  reamThread-1 afkaExecutor Stream in with 1805535461: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> OUT OF ORDER AND ROLLBACK TO PREVIOUS VERSION
  ] 
)
2020-04-20 08:05:45,157 INFO  reamThread-1 afkaExecutor Stream out  with 1889889916 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
  ] 
)
2020-04-20 08:05:45,209 WARN  reamThread-1 KTableSource Detected out-of-order KTable update for execution at offset 10, partition 2.
2020-04-20 08:05:45,313 INFO  reamThread-1 afkaExecutor Stream in with 1889889916: (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
  ] 
)
2020-04-20 08:05:45,350 INFO  reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
2020-04-20 08:05:45,350 INFO  reamThread-1 afkaExecutor Stream out  with 3651399223 : (
  state=RUNNING
  taskRunList=
  [
    TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
    TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
    TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
  ] 
)

I've some warning on console with message Detected out-of-order KTable update for execution at offset 10, partition 7.

The full source can be be found here.

If try also with many different approach, like this one here :

  • put the Execution and the WorkerTaskResult on the same topic to be sure to have the same only 1 message processed at the same time
  • and keep the last Execution by myself on a StateStore (in order to join WorkerTaskResult & Execution)
  • but it sound like I revinvente a KTable and this don't work better

or this one here :

  • mostly the same than previous (keep the last Execution by myself on a StateStore)
  • but using 2 KStream to KStream (removing the KTable).

My question is :

  • is this pattern (that is not a dag flow as we sink on the same topic) are supported by KafkaStreams ?
  • what is the good way to design this stream to be concurrency safe ?

Any clue are really appreciate, completely stuck since days, thanks

EDIT 1 :
Here is a some additional information :

  • Only KStream app publish new event to Execution, there is no outside app that publish on this topic, the only case an external app published to Execution is the first Event (aka the creation of the execution).
  • There is an WorkerApp (external app, simple consumer / producer) that consumed from WorkerTask (job to be done) and publish result on WorkerTaskResult (mostly the current state of the application).

Here is a simplified version of the actual stream :

Builder 
  -> Stream 1
     - from KStream<WorkerTaskResult> 
     - join KTable<Execution>
     - to Execution topic 
  -> Stream 2 
     - from KTable<Execution> (same than previous)
     - multiple output 
       - to WorkerTaskResult topic (if found an end) 
       - to Execution & to WorkerTask topic (if found a next task)
       - to Execution topic (if detect an Execution end) 

The KStream is mostly an Executor State App that find what is the next WorkerTask to be done and evaluate if the flow is ended, so the app can :

  • create new TaskRun
  • change state of current TaskRun
    • joining WorkerTaskResult or
    • evaluate the whole execution and found a task failed (based on dependencies)
  • change state of execution and publish a final state SUCCEED or FAILED that will break the "infinite loop"

On this actual version, what is really unclear for me is what is the meaning of Detected out-of-order KTable update in a real world? Does this mean that a KTable must have a single producer per partition and per key in order to keep order on the topic ?

EDIT 2 :
In a meantime, i found a new way to think the stream app that seems to be working. Unit test are passing and no more Detected out-of-order. Here is the new flow simplified :

Builder 
  - from KTable<Execution> 
  - leftJoin KTable<WorkerTaskResult> 
  - Branch 
    - If Join > to Execution topic
    - If not joint > continue the flow 
      - Multiple output (same than previous) 
        - to WorkerTaskResult topic (if found an end) 
        - to Execution & to WorkerTask topic (if found a next task)
        - to Execution topic (if detect an Execution end) 

What make sense I think is :

  • The WorkerTaskResult is now a KTable, so I only keep the last version of the result
  • I've a single path stream (and no more 2 path) that output to Execution (I think this is the most important part that resolved out-of-order)
  • The whole seems to have only one output per input (1 new value on Execution will produce 1 new value on Execution topic)

here is the new topology :

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [kestra_execution])
      --> KTABLE-SOURCE-0000000001
    Processor: KTABLE-SOURCE-0000000001 (stores: [execution])
      --> KTABLE-TOSTREAM-0000000002, KTABLE-JOINTHIS-0000000007
      <-- KSTREAM-SOURCE-0000000000
    Source: KSTREAM-SOURCE-0000000004 (topics: [kestra_workertaskresult])
      --> KTABLE-SOURCE-0000000005
    Processor: KTABLE-SOURCE-0000000005 (stores: [workertaskresult])
      --> KTABLE-JOINOTHER-0000000008
      <-- KSTREAM-SOURCE-0000000004
    Processor: KTABLE-JOINOTHER-0000000008 (stores: [execution])
      --> KTABLE-MERGE-0000000006
      <-- KTABLE-SOURCE-0000000005
    Processor: KTABLE-JOINTHIS-0000000007 (stores: [workertaskresult])
      --> KTABLE-MERGE-0000000006
      <-- KTABLE-SOURCE-0000000001
    Processor: KTABLE-MERGE-0000000006 (stores: [])
      --> KTABLE-TOSTREAM-0000000009
      <-- KTABLE-JOINTHIS-0000000007, KTABLE-JOINOTHER-0000000008
    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])
      --> KSTREAM-FILTER-0000000010, KSTREAM-FILTER-0000000015
      <-- KTABLE-MERGE-0000000006
    Processor: KSTREAM-FILTER-0000000015 (stores: [])
      --> KSTREAM-MAPVALUES-0000000016
      <-- KTABLE-TOSTREAM-0000000009
    Processor: KSTREAM-MAPVALUES-0000000016 (stores: [])
      --> KSTREAM-MAPVALUES-0000000017
      <-- KSTREAM-FILTER-0000000015
    Processor: KSTREAM-MAPVALUES-0000000017 (stores: [])
      --> KSTREAM-FLATMAPVALUES-0000000018, KSTREAM-FILTER-0000000024, KSTREAM-FILTER-0000000019, KSTREAM-MAPVALUES-0000000067
      <-- KSTREAM-MAPVALUES-0000000016
    Processor: KSTREAM-FLATMAPVALUES-0000000018 (stores: [])
      --> KSTREAM-FILTER-0000000042, KSTREAM-FILTER-0000000055, KSTREAM-FILTER-0000000030
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000042 (stores: [])
      --> KSTREAM-MAPVALUES-0000000043
      <-- KSTREAM-FLATMAPVALUES-0000000018
    Processor: KSTREAM-FILTER-0000000030 (stores: [])
      --> KSTREAM-MAPVALUES-0000000031
      <-- KSTREAM-FLATMAPVALUES-0000000018
    Processor: KSTREAM-FILTER-0000000055 (stores: [])
      --> KSTREAM-MAPVALUES-0000000056
      <-- KSTREAM-FLATMAPVALUES-0000000018
    Processor: KSTREAM-MAPVALUES-0000000043 (stores: [])
      --> KSTREAM-FILTER-0000000044, KSTREAM-FILTER-0000000050
      <-- KSTREAM-FILTER-0000000042
    Processor: KSTREAM-MAPVALUES-0000000031 (stores: [])
      --> KSTREAM-FILTER-0000000032, KSTREAM-FILTER-0000000038
      <-- KSTREAM-FILTER-0000000030
    Processor: KSTREAM-MAPVALUES-0000000056 (stores: [])
      --> KSTREAM-FILTER-0000000063, KSTREAM-FILTER-0000000057
      <-- KSTREAM-FILTER-0000000055
    Processor: KSTREAM-FILTER-0000000024 (stores: [])
      --> KSTREAM-MAPVALUES-0000000025
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000032 (stores: [])
      --> KSTREAM-MAPVALUES-0000000033
      <-- KSTREAM-MAPVALUES-0000000031
    Processor: KSTREAM-FILTER-0000000044 (stores: [])
      --> KSTREAM-MAPVALUES-0000000045
      <-- KSTREAM-MAPVALUES-0000000043
    Processor: KSTREAM-FILTER-0000000057 (stores: [])
      --> KSTREAM-MAPVALUES-0000000058
      <-- KSTREAM-MAPVALUES-0000000056
    Processor: KSTREAM-FILTER-0000000010 (stores: [])
      --> KSTREAM-MAPVALUES-0000000011
      <-- KTABLE-TOSTREAM-0000000009
    Processor: KSTREAM-FILTER-0000000019 (stores: [])
      --> KSTREAM-MAPVALUES-0000000020
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000050 (stores: [])
      --> KSTREAM-MAPVALUES-0000000051
      <-- KSTREAM-MAPVALUES-0000000043
    Processor: KSTREAM-MAPVALUES-0000000025 (stores: [])
      --> KSTREAM-FILTER-0000000026
      <-- KSTREAM-FILTER-0000000024
    Processor: KSTREAM-MAPVALUES-0000000033 (stores: [])
      --> KSTREAM-MAPVALUES-0000000034
      <-- KSTREAM-FILTER-0000000032
    Processor: KSTREAM-MAPVALUES-0000000045 (stores: [])
      --> KSTREAM-MAPVALUES-0000000046
      <-- KSTREAM-FILTER-0000000044
    Processor: KSTREAM-MAPVALUES-0000000058 (stores: [])
      --> KSTREAM-MAPVALUES-0000000059
      <-- KSTREAM-FILTER-0000000057
    Processor: KSTREAM-FILTER-0000000026 (stores: [])
      --> KSTREAM-FILTER-0000000027
      <-- KSTREAM-MAPVALUES-0000000025
    Processor: KSTREAM-FILTER-0000000038 (stores: [])
      --> KSTREAM-MAPVALUES-0000000039
      <-- KSTREAM-MAPVALUES-0000000031
    Processor: KSTREAM-FILTER-0000000063 (stores: [])
      --> KSTREAM-MAPVALUES-0000000064
      <-- KSTREAM-MAPVALUES-0000000056
    Processor: KSTREAM-MAPVALUES-0000000011 (stores: [])
      --> KSTREAM-FILTER-0000000012
      <-- KSTREAM-FILTER-0000000010
    Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
      --> KSTREAM-FILTER-0000000021
      <-- KSTREAM-FILTER-0000000019
    Processor: KSTREAM-MAPVALUES-0000000034 (stores: [])
      --> KSTREAM-FILTER-0000000035
      <-- KSTREAM-MAPVALUES-0000000033
    Processor: KSTREAM-MAPVALUES-0000000046 (stores: [])
      --> KSTREAM-FILTER-0000000047
      <-- KSTREAM-MAPVALUES-0000000045
    Processor: KSTREAM-MAPVALUES-0000000051 (stores: [])
      --> KSTREAM-FILTER-0000000052
      <-- KSTREAM-FILTER-0000000050
    Processor: KSTREAM-MAPVALUES-0000000059 (stores: [])
      --> KSTREAM-FILTER-0000000060
      <-- KSTREAM-MAPVALUES-0000000058
    Processor: KSTREAM-MAPVALUES-0000000067 (stores: [])
      --> KSTREAM-FILTER-0000000068
      <-- KSTREAM-MAPVALUES-0000000017
    Processor: KSTREAM-FILTER-0000000012 (stores: [])
      --> KSTREAM-PEEK-0000000013
      <-- KSTREAM-MAPVALUES-0000000011
    Processor: KSTREAM-FILTER-0000000021 (stores: [])
      --> KSTREAM-PEEK-0000000022
      <-- KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-FILTER-0000000027 (stores: [])
      --> KSTREAM-PEEK-0000000028
      <-- KSTREAM-FILTER-0000000026
    Processor: KSTREAM-FILTER-0000000035 (stores: [])
      --> KSTREAM-PEEK-0000000036
      <-- KSTREAM-MAPVALUES-0000000034
    Processor: KSTREAM-FILTER-0000000047 (stores: [])
      --> KSTREAM-PEEK-0000000048
      <-- KSTREAM-MAPVALUES-0000000046
    Processor: KSTREAM-FILTER-0000000052 (stores: [])
      --> KSTREAM-PEEK-0000000053
      <-- KSTREAM-MAPVALUES-0000000051
    Processor: KSTREAM-FILTER-0000000060 (stores: [])
      --> KSTREAM-PEEK-0000000061
      <-- KSTREAM-MAPVALUES-0000000059
    Processor: KSTREAM-FILTER-0000000068 (stores: [])
      --> KSTREAM-PEEK-0000000069
      <-- KSTREAM-MAPVALUES-0000000067
    Processor: KSTREAM-MAPVALUES-0000000039 (stores: [])
      --> KSTREAM-FILTER-0000000040
      <-- KSTREAM-FILTER-0000000038
    Processor: KSTREAM-MAPVALUES-0000000064 (stores: [])
      --> KSTREAM-TRANSFORM-0000000065
      <-- KSTREAM-FILTER-0000000063
    Processor: KSTREAM-FILTER-0000000040 (stores: [])
      --> KSTREAM-SINK-0000000041
      <-- KSTREAM-MAPVALUES-0000000039
    Processor: KSTREAM-PEEK-0000000013 (stores: [])
      --> KSTREAM-SINK-0000000014
      <-- KSTREAM-FILTER-0000000012
    Processor: KSTREAM-PEEK-0000000022 (stores: [])
      --> KSTREAM-SINK-0000000023
      <-- KSTREAM-FILTER-0000000021
    Processor: KSTREAM-PEEK-0000000028 (stores: [])
      --> KSTREAM-SINK-0000000029
      <-- KSTREAM-FILTER-0000000027
    Processor: KSTREAM-PEEK-0000000036 (stores: [])
      --> KSTREAM-SINK-0000000037
      <-- KSTREAM-FILTER-0000000035
    Processor: KSTREAM-PEEK-0000000048 (stores: [])
      --> KSTREAM-SINK-0000000049
      <-- KSTREAM-FILTER-0000000047
    Processor: KSTREAM-PEEK-0000000053 (stores: [])
      --> KSTREAM-SINK-0000000054
      <-- KSTREAM-FILTER-0000000052
    Processor: KSTREAM-PEEK-0000000061 (stores: [])
      --> KSTREAM-SINK-0000000062
      <-- KSTREAM-FILTER-0000000060
    Processor: KSTREAM-PEEK-0000000069 (stores: [])
      --> KSTREAM-SINK-0000000070
      <-- KSTREAM-FILTER-0000000068
    Processor: KSTREAM-TRANSFORM-0000000065 (stores: [workertask_deduplication])
      --> KSTREAM-SINK-0000000066
      <-- KSTREAM-MAPVALUES-0000000064
    Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
      --> log-executionStream
      <-- KTABLE-SOURCE-0000000001
    Sink: KSTREAM-SINK-0000000014 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000013
    Sink: KSTREAM-SINK-0000000023 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000022
    Sink: KSTREAM-SINK-0000000029 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000028
    Sink: KSTREAM-SINK-0000000037 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000036
    Sink: KSTREAM-SINK-0000000041 (topic: kestra_workertaskresult)
      <-- KSTREAM-FILTER-0000000040
    Sink: KSTREAM-SINK-0000000049 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000048
    Sink: KSTREAM-SINK-0000000054 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000053
    Sink: KSTREAM-SINK-0000000062 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000061
    Sink: KSTREAM-SINK-0000000066 (topic: kestra_workertask)
      <-- KSTREAM-TRANSFORM-0000000065
    Sink: KSTREAM-SINK-0000000070 (topic: kestra_execution)
      <-- KSTREAM-PEEK-0000000069
    Processor: log-executionStream (stores: [])
      --> none
      <-- KTABLE-TOSTREAM-0000000002

For now, it was unclear for me if the solution will be resilient for any concurrency and if I can hit another time an out-of-order (that mean that the execution is rollback in a previous time and that lead to multiple execution of the same task).

1

1 Answers

1
votes

is this pattern (that is not a dag flow as we sink on the same topic) are supported by KafkaStreams ?

In general yes. You just need to make sure that you don't end up with an "infinite loop", i.e., at some point an input record should "terminate" and not produce anything to the output topic any longer. For your case, and Execution should eventually not create new Tasks any longer (via the feedback loop).

what is the good way to design this stream to be concurrency safe

It always depends on the concrete application... For your case, if I understand the design of your application correctly, you basically have two input topics (Execution and WorkerTaskResult) and two output topics (Execution and WorkerTask). When processing the input topics, messages from each input may modify shared state (i.e., a task's state).

Additionally, there is an "outside application" that reads from the WorkerTask topic and write to the WorkerTaskResult topic? Hence, there is actually a second loop in you overall data flow? I assume that there are other upstream applications that will actually push new data into the Execution topic, too?

                             +-----------------+
                             |                 |
                             v                 |
upstream producers ---> "Execution" --+        |
                                      |        |
                                      v        |  
                                      KS-App --+
                                      ^        |
                                      |        |
            +--> "WorkerTaskResult" --+        +--> "WorkerTask" --+
            |                                                      |
            +------------------------ outside app <----------------+

What is unclear to me atm:

  • which state changes are propagated from KS-App directly back to Execution?
  • which state changes are propagated from the "outside app" via WorkerTaskResult?

Maybe you can update your question and I can try to update my answer accordingly.

Update (based on Edit 1 & 2)

to Execution & to WorkerTask topic (if found a next task)

This step seems to introduce the race condition? When writing back to the Execution topic, you update the state when it's read back. In parallel the execution of the task might finish first (ie, before the Execution update was re-read and processed) and thus a second Execution update (when the task was finished) could we written to update the state first?

On this actual version, what is really unclear for me is what is the meaning of Detected out-of-order KTable update in a real world? Does this mean that a KTable must have a single producer per partition and per key in order to keep order on the topic ?

You could say that. For each input record, the table() operator compare the timestamp of the input to the timestamp of current entry in the table. If the input records has a smaller timestamp, the WARN is logged (the update would still be applied): the reason for the WARN is, that the table only stores one entry per key, and table expects to only move forward in time. If there are out-of-order updates, this may lead to unexpected results, and thus the WARN log. Using a single producer per partition OR a single producer per key would avoid out of order data per key (assuming that the producer only sends ordered data).

I am not 100% sure atm if I fully understand your new version of your app, but in general, you want to make sure to avoid a data race, and to linearize the updates to Execution.