In NiFi, I have a cron driven sequence of processors that provides daily a set of flowfiles which contain 2 attributes I am interested in : product_code
and publication_date
.
My need is to keep only one flowfile per product_code
: the one with the most recent publication_date
.
Ex:
For this input :
flow_1: product_code: A / publication_date : 2018-01-01
flow_2: product_code: B / publication_date : 2018-01-01
flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_5: product_code: A / publication_date : 2000-12-31
flow_6: product_code: B / publication_date : 2018-02-02
flow_7: product_code: B / publication_date : 2018-03-03
The expected output should be :
flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_7: product_code: B / publication_date : 2018-03-03
The algorithm I tested
- Use an
UpdateAttribute
processor to add an attributepriority
to each flowfile, based on thepublication_date
. - These updated flowfiles are redirected to a
PriorityAttributePrioritizer
queue. - The flowfiles stay in this queue because there is only one consuming processor, which is cron driven. By this way, I am sure that the flowfiles in the queue are ordered according to the
publication_date
. - Then the CRON trigger the next processor, a
DetectDuplicate
based on theproduct_code
attribute. As the flowfiles are processed from the most recent item to the oldest one, I am sure that when aproduct_code
is detected as duplicate, it is because the sameproduct_code
was already OK with a more recentpublication_date
.
The issue
Sadly, when the cron triggers the DetectDuplicate
processor, only one message is consumed, and the others stay in the queue.
If I change the "Scheduling strategy" to "Timer driven" with a "Run schedule" of 0, all my flowfiles are consumed and the output is what is expected.
Is there a way to ask my DetectDuplicate
processor to consume all the messages in the queue when it starts to work (and not only one message)?
Or is there a way to set up a scheduling strategy like "Start to work at 2:00 AM and stop at 4:00 AM" ?
Do you think of better strategies to meet the need ?
Regards,
Val.
Update 1
(2018-04-13) More information, in addition to Bryan Bende's comments.
I know CRON is not the best solution, but I do not know how to improve my algorithm to get rid of it.
In my case, the flowFiles that are queued to be deduplicated are generated via a sequence of 3 REST calls:
- 1st call to "GetAllCategories",
- then for each category, call the "GetSubCategories",
- and for each subCategory, call the "GetProducts".
This flowFiles generation part lasts generally around 5 minutes: last night the first flowFile arrived in the queue at 2:00:16 AM and the last one at 2:04:58 AM. (That's why I scheduled the DetectDuplicate
to run at 3:00 AM.)
If my DetectDuplicate
processor would be "Timer driven" scheduled, the first flowFiles arriving in the queue would be consumed by the processor, before all the flowFiles to be there.
And this would break the ordering of the full set of flowFiles.
I feel like I have to wait all the flowfiles to be in the queue before the DetectDuplicate
processor starts working.
Do you have potential suggestions to improve my algorithm?