0
votes

USECASE

HazelcastJet version 0.6.1 Hazelcast version 3.10.2

Given this (simpified version) of a DAG

VERTICES

S1 Source that emits 5 items of type A (read from DB with partitioning) Local parallelism = 1

S2 Source that emits 150K items of type B (Iterator that read from DB in batch of 100 with partitioning) Local parallelism = 1

AD Processor that adapts types A->A1 and B->B1 and emits one by one

FA Processors.filterP that accepts only items of type A1 and emits one by one

FB Processors.filterP that accepts only items of type B1 and emits one by one

CL Processor that first accumulate all items of type A1, then when it receive an item of type B1, enriches it with some staff got from proper A1, and emit, one by one.

WR Sink that writes B1 Local parallelism = 1

NOTE: Just to give meaning to the filter processor: in the DAG there are other sources that flows into the same adapter AD and then goes to other paths using filter processors.

EDGES

S1 --> AD

S2 --> AD

AD --> FA (from ordinal 0)

AD --> FB (from ordinal 1)

FA --> CL (to ordinal 0 with priority 0 distributed and broadcast)

FB --> CL (to ordinal 1 with priority 1)

CL --> WR

PROBLEM

If source S2 have "few" items to load (i.e. 15K) the emitFromTraverser never returns false.

If source S2 have "many" items to load (i.e. 150K) the emitFromTraverser returns false after:

  • All A1 items have been processed by CL
  • About 30% of B1 items have already been transmitted to CL but no one have been processed by CL (DiagnosticProcessor log that element are sent to CL but not processed)

S2 code for reference:

protected void init(Context context) throws Exception {
    super.init(context);
    this.iterator = new BQueryIterator(querySupplier, batchSize);
    this.traverser = Traversers.traverseIterator(this.iterator);
}

public boolean complete() {
    boolean result = emitFromTraverser(this.traverser);
    return result;
}

QUESTION

  • Is it correct that CL doesn't process items until source ends?
  • Is the usage of priority + distributed + broadcast correct on CL Vertex?

UPDATE

It seems that the completeEdge on CL edge 1 is never called. Someone can tell me why?

Thanks!

2

2 Answers

2
votes

You suffer from a deadlock caused by priority. Your DAG branches from AD and then rejoins in CL, but with a priority.

AD --+---- FA ----+-- CL
      \          /
       +-- FB --+

Setting a priority causes that no item from lower-priority edge is processed before all items from higher-priority edge are processed. AD will eventually get blocked by backpressure from the lower-priority path, which is not processed by CL. So AD is blocked because it can't emit to the lower priority edge and CL is blocked, because it it's still waiting for items from the higher priority edge, resulting in a deadlock.

In your case, you can resolve it by making 2 AD vertices, each processing items from one of the sources:

S1 --- AD1 ----+--- CL
              /
S2 --- AD2 --+
0
votes

After a while I've understood what's the problem...

CL processor cannot know when all the A1 items have been processed because all items they all come from the AD processor. So it need to wait for all sources coming from AD before starting the processing of B1 items.

Not sure but probably after a lot of items B loaded, all Inboxes buffers in the DAG become full and can't accept any other B from S2, but at the same time cannot process B1 items to continue: that's the deadlock.

Maybe DAG would be able to detect this? I don't know Jet so deeply but it would be nice to have that warning.

Maybe is there some logging to enable?

I hope someone can confirm my answer and suggest how to improve and detect these problems.