I'm asking for a confirmation to my assumption about the tryProcess() logic.
In detail how the return value (true/false) influence the DAG workflow on a processor with 2 incoming edges with no priority specified.
My assumption is that if the processor have incoming items for both edges, and one tryProcess() return false, the other edge will be processed (if more incoming items are available on such edge). Alternating incoming items depending on which edge stop processing and which accept them.
Problem
Sometimes happens that one processor instance blocks on tryProcess(#0) that always returns false (because we expect to process a new item from other edge). tryProcess(#0) is called repetedly and tryProcess(#1) is never called. I'm sure that the completeEdge() is never called on the processor neither for #0 nor #1 edge, so I expect there are more items to process from edge #1. This happens usually after running the same Job multiple times.
To better explain the question, this is my use case:
Use Case
My datamodel is composed by following object
- A: Object identified by "ida" attribute
- B: Object identified by "idb" attribute. It has a reference to A using "ida" value
- AB: Object that couples B object and its referenced A object
I need to match B objects with proper referenced A object and emit a couple of them.
I have a DAG with this setup:
Vertices
- S-A: Source items of type "A" (localParallelism(1), emits A objects sorted by "ida" attribute)
- S-B: Source items of type "B" (localParallelism(1), emits B objects sorted by referenced "ida" attribute)
- C-AB: Processor that matches B objects with referenced A object (emits AB objects)
Connections
- S-A -> C-AB : incoming edge #0 (no priority specified, partitioned by "ida" attribute)
- S-B -> C-AB : incoming edge #1 (no priority specified, partitioned by reference to "ida" attribute)
The environment is composed by an hazelcast jet cluster with 2 nodes.
Logics
The C-AB processor get the first "A" object (from edge #0) and keep it until all "B" objects related to that "A" object are processed. If it receives another "A" object it return false in tryProcess(#0).
While it receives "B" objects (from edge #1) which match to current "A", it emits "AB".
When the processor receives a "B" object with a reference with a next "A" object, it discards current "A" and waits for the next one.
If it receives "B" objects before having the referenced "A" object, wait for the proper "A" to match returning false in tryProcess(#1) if a new "B" is received.
This should work because S-A and S-B emits properly sorted objects and edges are properly partitioned to send objects with same "ida" values to the same processor.