I've a complex Kafka Stream application with 2 flows fully stateful in the same stream :
- it use a
Execution
topic as source, enhanced the message and republished back to the sameExecution
topic. - it join another topic
WorkerTaskResult
, add the result toExecution
and published back toExecution
Topic.
The main goal is to provide a workflow system.
The detailled logic are :
- an Execution is a list of TaskRun
- the
Execution
look at all the current state of allTaskRun
and find the next one to execute - If any is found, Execution alter their
TaskRunsList
and add the next one and publish back to Kafka, also it send to another queue the task to be done (WorkerTask
) - the
WorkerTask
is proceed outside of the Kafka stream and publish back to another queue (WorkerTaskResult
) with a simple Kafka Consumer & Producer - the
WorkerTaskResult
alter currentTaskRun
in the currentExecution
and changed the status (mostly RUNNING / SUCCEED / FAILED) and also published back toExecution
queue (with Kafka Stream)
As you can see, the Execution
(with TaskRun
list) is the state are current application.
The stream works well when all the message are sequential (no concurrency, I can only have one alter of TaskRun
list at the same time). When the workflow became Parallel (concurrent WorkerTaskResult
can be join), it seems that my Execution state is override and produce a kind of roolback.
Example log output:
2020-04-20 08:05:44,830 INFO reamThread-1 afkaExecutor Stream in with 3264792750: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=CREATED) # >>>>> t1 is created
]
)
2020-04-20 08:05:44,881 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> worker send running state
2020-04-20 08:05:44,882 INFO reamThread-1 afkaExecutor Stream out with 1805535461 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> t1 save the running state
]
)
2020-04-20 08:05:45,047 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS) # >>>>> worker send success
2020-04-20 08:05:45,047 INFO reamThread-1 afkaExecutor Stream out with 578845055 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=SUCCESS)
]
)
2020-04-20 08:05:45,153 INFO reamThread-1 afkaExecutor Stream in with 1805535461: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING) # >>>>> OUT OF ORDER AND ROLLBACK TO PREVIOUS VERSION
]
)
2020-04-20 08:05:45,157 INFO reamThread-1 afkaExecutor Stream out with 1889889916 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
]
)
2020-04-20 08:05:45,209 WARN reamThread-1 KTableSource Detected out-of-order KTable update for execution at offset 10, partition 2.
2020-04-20 08:05:45,313 INFO reamThread-1 afkaExecutor Stream in with 1889889916: (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=CREATED)
]
)
2020-04-20 08:05:45,350 INFO reamThread-1 afkaExecutor WorkerTaskResult: TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
2020-04-20 08:05:45,350 INFO reamThread-1 afkaExecutor Stream out with 3651399223 : (
state=RUNNING
taskRunList=
[
TaskRun(id=6FiJ3US6jqZbtU3JL2AZD6, taskId=parent, value=null, state=RUNNING),
TaskRun(id=75mtoz5KVRydOo3VJnX68s, taskId=t1, value=null, state=RUNNING),
TaskRun(id=6k23oBXy9cD0uCJeZ20SpB, taskId=t2, value=null, state=RUNNING)
]
)
I've some warning on console with message Detected out-of-order KTable update for execution at offset 10, partition 7.
The full source can be be found here.
If try also with many different approach, like this one here :
- put the
Execution
and theWorkerTaskResult
on the same topic to be sure to have the same only 1 message processed at the same time - and keep the last
Execution
by myself on a StateStore (in order to joinWorkerTaskResult
&Execution
) - but it sound like I revinvente a KTable and this don't work better
or this one here :
- mostly the same than previous (keep the last
Execution
by myself on a StateStore) - but using 2 KStream to KStream (removing the KTable).
My question is :
- is this pattern (that is not a dag flow as we sink on the same topic) are supported by KafkaStreams ?
- what is the good way to design this stream to be concurrency safe ?
Any clue are really appreciate, completely stuck since days, thanks
EDIT 1 :
Here is a some additional information :
- Only KStream app publish new event to
Execution
, there is no outside app that publish on this topic, the only case an external app published to Execution is the first Event (aka the creation of the execution). - There is an
WorkerApp
(external app, simple consumer / producer) that consumed fromWorkerTask
(job to be done) and publish result onWorkerTaskResult
(mostly the current state of the application).
Here is a simplified version of the actual stream :
Builder
-> Stream 1
- from KStream<WorkerTaskResult>
- join KTable<Execution>
- to Execution topic
-> Stream 2
- from KTable<Execution> (same than previous)
- multiple output
- to WorkerTaskResult topic (if found an end)
- to Execution & to WorkerTask topic (if found a next task)
- to Execution topic (if detect an Execution end)
The KStream is mostly an Executor State App that find what is the next WorkerTask
to be done and evaluate if the flow is ended, so the app can :
- create new
TaskRun
- change state of current
TaskRun
- joining
WorkerTaskResult
or - evaluate the whole execution and found a task failed (based on dependencies)
- joining
- change state of execution and publish a final state SUCCEED or FAILED that will break the "infinite loop"
On this actual version, what is really unclear for me is what is the meaning of Detected out-of-order KTable update
in a real world?
Does this mean that a KTable must have a single producer per partition and per key in order to keep order on the topic ?
EDIT 2 :
In a meantime, i found a new way to think the stream app that seems to be working. Unit test are passing and no more Detected out-of-order
.
Here is the new flow simplified :
Builder
- from KTable<Execution>
- leftJoin KTable<WorkerTaskResult>
- Branch
- If Join > to Execution topic
- If not joint > continue the flow
- Multiple output (same than previous)
- to WorkerTaskResult topic (if found an end)
- to Execution & to WorkerTask topic (if found a next task)
- to Execution topic (if detect an Execution end)
What make sense I think is :
- The
WorkerTaskResult
is now a KTable, so I only keep the last version of the result - I've a single path stream (and no more 2 path) that output to
Execution
(I think this is the most important part that resolved out-of-order) - The whole seems to have only one output per input (1 new value on
Execution
will produce 1 new value onExecution
topic)
here is the new topology :
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [kestra_execution])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [execution])
--> KTABLE-TOSTREAM-0000000002, KTABLE-JOINTHIS-0000000007
<-- KSTREAM-SOURCE-0000000000
Source: KSTREAM-SOURCE-0000000004 (topics: [kestra_workertaskresult])
--> KTABLE-SOURCE-0000000005
Processor: KTABLE-SOURCE-0000000005 (stores: [workertaskresult])
--> KTABLE-JOINOTHER-0000000008
<-- KSTREAM-SOURCE-0000000004
Processor: KTABLE-JOINOTHER-0000000008 (stores: [execution])
--> KTABLE-MERGE-0000000006
<-- KTABLE-SOURCE-0000000005
Processor: KTABLE-JOINTHIS-0000000007 (stores: [workertaskresult])
--> KTABLE-MERGE-0000000006
<-- KTABLE-SOURCE-0000000001
Processor: KTABLE-MERGE-0000000006 (stores: [])
--> KTABLE-TOSTREAM-0000000009
<-- KTABLE-JOINTHIS-0000000007, KTABLE-JOINOTHER-0000000008
Processor: KTABLE-TOSTREAM-0000000009 (stores: [])
--> KSTREAM-FILTER-0000000010, KSTREAM-FILTER-0000000015
<-- KTABLE-MERGE-0000000006
Processor: KSTREAM-FILTER-0000000015 (stores: [])
--> KSTREAM-MAPVALUES-0000000016
<-- KTABLE-TOSTREAM-0000000009
Processor: KSTREAM-MAPVALUES-0000000016 (stores: [])
--> KSTREAM-MAPVALUES-0000000017
<-- KSTREAM-FILTER-0000000015
Processor: KSTREAM-MAPVALUES-0000000017 (stores: [])
--> KSTREAM-FLATMAPVALUES-0000000018, KSTREAM-FILTER-0000000024, KSTREAM-FILTER-0000000019, KSTREAM-MAPVALUES-0000000067
<-- KSTREAM-MAPVALUES-0000000016
Processor: KSTREAM-FLATMAPVALUES-0000000018 (stores: [])
--> KSTREAM-FILTER-0000000042, KSTREAM-FILTER-0000000055, KSTREAM-FILTER-0000000030
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000042 (stores: [])
--> KSTREAM-MAPVALUES-0000000043
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-FILTER-0000000030 (stores: [])
--> KSTREAM-MAPVALUES-0000000031
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-FILTER-0000000055 (stores: [])
--> KSTREAM-MAPVALUES-0000000056
<-- KSTREAM-FLATMAPVALUES-0000000018
Processor: KSTREAM-MAPVALUES-0000000043 (stores: [])
--> KSTREAM-FILTER-0000000044, KSTREAM-FILTER-0000000050
<-- KSTREAM-FILTER-0000000042
Processor: KSTREAM-MAPVALUES-0000000031 (stores: [])
--> KSTREAM-FILTER-0000000032, KSTREAM-FILTER-0000000038
<-- KSTREAM-FILTER-0000000030
Processor: KSTREAM-MAPVALUES-0000000056 (stores: [])
--> KSTREAM-FILTER-0000000063, KSTREAM-FILTER-0000000057
<-- KSTREAM-FILTER-0000000055
Processor: KSTREAM-FILTER-0000000024 (stores: [])
--> KSTREAM-MAPVALUES-0000000025
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000032 (stores: [])
--> KSTREAM-MAPVALUES-0000000033
<-- KSTREAM-MAPVALUES-0000000031
Processor: KSTREAM-FILTER-0000000044 (stores: [])
--> KSTREAM-MAPVALUES-0000000045
<-- KSTREAM-MAPVALUES-0000000043
Processor: KSTREAM-FILTER-0000000057 (stores: [])
--> KSTREAM-MAPVALUES-0000000058
<-- KSTREAM-MAPVALUES-0000000056
Processor: KSTREAM-FILTER-0000000010 (stores: [])
--> KSTREAM-MAPVALUES-0000000011
<-- KTABLE-TOSTREAM-0000000009
Processor: KSTREAM-FILTER-0000000019 (stores: [])
--> KSTREAM-MAPVALUES-0000000020
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000050 (stores: [])
--> KSTREAM-MAPVALUES-0000000051
<-- KSTREAM-MAPVALUES-0000000043
Processor: KSTREAM-MAPVALUES-0000000025 (stores: [])
--> KSTREAM-FILTER-0000000026
<-- KSTREAM-FILTER-0000000024
Processor: KSTREAM-MAPVALUES-0000000033 (stores: [])
--> KSTREAM-MAPVALUES-0000000034
<-- KSTREAM-FILTER-0000000032
Processor: KSTREAM-MAPVALUES-0000000045 (stores: [])
--> KSTREAM-MAPVALUES-0000000046
<-- KSTREAM-FILTER-0000000044
Processor: KSTREAM-MAPVALUES-0000000058 (stores: [])
--> KSTREAM-MAPVALUES-0000000059
<-- KSTREAM-FILTER-0000000057
Processor: KSTREAM-FILTER-0000000026 (stores: [])
--> KSTREAM-FILTER-0000000027
<-- KSTREAM-MAPVALUES-0000000025
Processor: KSTREAM-FILTER-0000000038 (stores: [])
--> KSTREAM-MAPVALUES-0000000039
<-- KSTREAM-MAPVALUES-0000000031
Processor: KSTREAM-FILTER-0000000063 (stores: [])
--> KSTREAM-MAPVALUES-0000000064
<-- KSTREAM-MAPVALUES-0000000056
Processor: KSTREAM-MAPVALUES-0000000011 (stores: [])
--> KSTREAM-FILTER-0000000012
<-- KSTREAM-FILTER-0000000010
Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
--> KSTREAM-FILTER-0000000021
<-- KSTREAM-FILTER-0000000019
Processor: KSTREAM-MAPVALUES-0000000034 (stores: [])
--> KSTREAM-FILTER-0000000035
<-- KSTREAM-MAPVALUES-0000000033
Processor: KSTREAM-MAPVALUES-0000000046 (stores: [])
--> KSTREAM-FILTER-0000000047
<-- KSTREAM-MAPVALUES-0000000045
Processor: KSTREAM-MAPVALUES-0000000051 (stores: [])
--> KSTREAM-FILTER-0000000052
<-- KSTREAM-FILTER-0000000050
Processor: KSTREAM-MAPVALUES-0000000059 (stores: [])
--> KSTREAM-FILTER-0000000060
<-- KSTREAM-MAPVALUES-0000000058
Processor: KSTREAM-MAPVALUES-0000000067 (stores: [])
--> KSTREAM-FILTER-0000000068
<-- KSTREAM-MAPVALUES-0000000017
Processor: KSTREAM-FILTER-0000000012 (stores: [])
--> KSTREAM-PEEK-0000000013
<-- KSTREAM-MAPVALUES-0000000011
Processor: KSTREAM-FILTER-0000000021 (stores: [])
--> KSTREAM-PEEK-0000000022
<-- KSTREAM-MAPVALUES-0000000020
Processor: KSTREAM-FILTER-0000000027 (stores: [])
--> KSTREAM-PEEK-0000000028
<-- KSTREAM-FILTER-0000000026
Processor: KSTREAM-FILTER-0000000035 (stores: [])
--> KSTREAM-PEEK-0000000036
<-- KSTREAM-MAPVALUES-0000000034
Processor: KSTREAM-FILTER-0000000047 (stores: [])
--> KSTREAM-PEEK-0000000048
<-- KSTREAM-MAPVALUES-0000000046
Processor: KSTREAM-FILTER-0000000052 (stores: [])
--> KSTREAM-PEEK-0000000053
<-- KSTREAM-MAPVALUES-0000000051
Processor: KSTREAM-FILTER-0000000060 (stores: [])
--> KSTREAM-PEEK-0000000061
<-- KSTREAM-MAPVALUES-0000000059
Processor: KSTREAM-FILTER-0000000068 (stores: [])
--> KSTREAM-PEEK-0000000069
<-- KSTREAM-MAPVALUES-0000000067
Processor: KSTREAM-MAPVALUES-0000000039 (stores: [])
--> KSTREAM-FILTER-0000000040
<-- KSTREAM-FILTER-0000000038
Processor: KSTREAM-MAPVALUES-0000000064 (stores: [])
--> KSTREAM-TRANSFORM-0000000065
<-- KSTREAM-FILTER-0000000063
Processor: KSTREAM-FILTER-0000000040 (stores: [])
--> KSTREAM-SINK-0000000041
<-- KSTREAM-MAPVALUES-0000000039
Processor: KSTREAM-PEEK-0000000013 (stores: [])
--> KSTREAM-SINK-0000000014
<-- KSTREAM-FILTER-0000000012
Processor: KSTREAM-PEEK-0000000022 (stores: [])
--> KSTREAM-SINK-0000000023
<-- KSTREAM-FILTER-0000000021
Processor: KSTREAM-PEEK-0000000028 (stores: [])
--> KSTREAM-SINK-0000000029
<-- KSTREAM-FILTER-0000000027
Processor: KSTREAM-PEEK-0000000036 (stores: [])
--> KSTREAM-SINK-0000000037
<-- KSTREAM-FILTER-0000000035
Processor: KSTREAM-PEEK-0000000048 (stores: [])
--> KSTREAM-SINK-0000000049
<-- KSTREAM-FILTER-0000000047
Processor: KSTREAM-PEEK-0000000053 (stores: [])
--> KSTREAM-SINK-0000000054
<-- KSTREAM-FILTER-0000000052
Processor: KSTREAM-PEEK-0000000061 (stores: [])
--> KSTREAM-SINK-0000000062
<-- KSTREAM-FILTER-0000000060
Processor: KSTREAM-PEEK-0000000069 (stores: [])
--> KSTREAM-SINK-0000000070
<-- KSTREAM-FILTER-0000000068
Processor: KSTREAM-TRANSFORM-0000000065 (stores: [workertask_deduplication])
--> KSTREAM-SINK-0000000066
<-- KSTREAM-MAPVALUES-0000000064
Processor: KTABLE-TOSTREAM-0000000002 (stores: [])
--> log-executionStream
<-- KTABLE-SOURCE-0000000001
Sink: KSTREAM-SINK-0000000014 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000013
Sink: KSTREAM-SINK-0000000023 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000022
Sink: KSTREAM-SINK-0000000029 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000028
Sink: KSTREAM-SINK-0000000037 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000036
Sink: KSTREAM-SINK-0000000041 (topic: kestra_workertaskresult)
<-- KSTREAM-FILTER-0000000040
Sink: KSTREAM-SINK-0000000049 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000048
Sink: KSTREAM-SINK-0000000054 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000053
Sink: KSTREAM-SINK-0000000062 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000061
Sink: KSTREAM-SINK-0000000066 (topic: kestra_workertask)
<-- KSTREAM-TRANSFORM-0000000065
Sink: KSTREAM-SINK-0000000070 (topic: kestra_execution)
<-- KSTREAM-PEEK-0000000069
Processor: log-executionStream (stores: [])
--> none
<-- KTABLE-TOSTREAM-0000000002
For now, it was unclear for me if the solution will be resilient for any concurrency and if I can hit another time an out-of-order (that mean that the execution is rollback in a previous time and that lead to multiple execution of the same task).