0
votes

In my project I'm reading data from a mongo database on a schedule, performing some processing on each new row, and then forwarding to another processor.

I want to ensure that none of the records I read from mongo have already been processed, so I'm using a second mongoDB collection as a tracking table.

I'd like to have logic that works like this:

  • Read from initial database [GetMongo Processor]
  • If record doesn't exist yet in tracking DB [GetMongo], insert it [PutMongo]
  • If record exists already in tracking DB, terminate flow (We can assume it's already been processed)
  • Tracking DB will be queried later by a separate processor in another flow on another schedule

My problem is in the "continue only if doesn't exist in tracking table" step.

If I use a GetMongo processor, it appears to terminate the flow when no records are returned. It follows the success path when results are returned, but I want to terminate that anyway. For both records and no_records results, the original path is triggered with the original file, but I don't think that's going to help me in this flow.

I'm leaning towards using an ExecuteScript processor and just using Mongo's db.collection.count() functionality via ECMAScript which I can then route on, but ideally I'd like a cleaner option if it's possible.

1
I expected "success" and "failure" to play such a trigger. But it didn't work well, I give up in pure "GetMongo". - KyungHoon Kim

1 Answers

1
votes

After looking at the DetectDuplicate processor, I decided that the cache-nature of it made it unsuitable for my purposes.

I ended up using the maven archetype to create a custom processor to handle this logic which split into matched and unmatched relationships based on whether the query had any reuslts or not.

Link to the archetype to create your own processor - https://mvnrepository.com/artifact/org.apache.nifi/nifi-processor-bundle-archetype