3
votes

In NiFi, I have a cron driven sequence of processors that provides daily a set of flowfiles which contain 2 attributes I am interested in : product_code and publication_date.

My need is to keep only one flowfile per product_code: the one with the most recent publication_date.

Ex:

For this input :

flow_1: product_code: A / publication_date : 2018-01-01
flow_2: product_code: B / publication_date : 2018-01-01
flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_5: product_code: A / publication_date : 2000-12-31
flow_6: product_code: B / publication_date : 2018-02-02
flow_7: product_code: B / publication_date : 2018-03-03

The expected output should be :

flow_3: product_code: C / publication_date : 2018-01-01
flow_4: product_code: A / publication_date : 2018-04-12
flow_7: product_code: B / publication_date : 2018-03-03

The algorithm I tested

  1. Use an UpdateAttribute processor to add an attribute priority to each flowfile, based on the publication_date.
  2. These updated flowfiles are redirected to a PriorityAttributePrioritizer queue.
  3. The flowfiles stay in this queue because there is only one consuming processor, which is cron driven. By this way, I am sure that the flowfiles in the queue are ordered according to the publication_date.
  4. Then the CRON trigger the next processor, a DetectDuplicate based on the product_code attribute. As the flowfiles are processed from the most recent item to the oldest one, I am sure that when a product_code is detected as duplicate, it is because the same product_code was already OK with a more recent publication_date.

The issue

Sadly, when the cron triggers the DetectDuplicate processor, only one message is consumed, and the others stay in the queue.

If I change the "Scheduling strategy" to "Timer driven" with a "Run schedule" of 0, all my flowfiles are consumed and the output is what is expected.

Is there a way to ask my DetectDuplicate processor to consume all the messages in the queue when it starts to work (and not only one message)?

Or is there a way to set up a scheduling strategy like "Start to work at 2:00 AM and stop at 4:00 AM" ?

Do you think of better strategies to meet the need ?

Regards,

Val.


Update 1

(2018-04-13) More information, in addition to Bryan Bende's comments.

I know CRON is not the best solution, but I do not know how to improve my algorithm to get rid of it.

In my case, the flowFiles that are queued to be deduplicated are generated via a sequence of 3 REST calls:

  • 1st call to "GetAllCategories",
  • then for each category, call the "GetSubCategories",
  • and for each subCategory, call the "GetProducts".

This flowFiles generation part lasts generally around 5 minutes: last night the first flowFile arrived in the queue at 2:00:16 AM and the last one at 2:04:58 AM. (That's why I scheduled the DetectDuplicate to run at 3:00 AM.)

If my DetectDuplicate processor would be "Timer driven" scheduled, the first flowFiles arriving in the queue would be consumed by the processor, before all the flowFiles to be there.

And this would break the ordering of the full set of flowFiles.

I feel like I have to wait all the flowfiles to be in the queue before the DetectDuplicate processor starts working.

Do you have potential suggestions to improve my algorithm?

2

2 Answers

4
votes

You should generally use CRON scheduling for the source processor that starts the flow and then all other processors should be Timer Driven with Run Schedule of 0.

For example, if you pick up files from a directory every day at 2:00 AM, then GetFile should be scheduled with a CRON expression to start the flow at 2:00 AM, but nothing beyond that needs CRON scheduling because they will never receive data unless GetFile runs.

In the case where you want a processor to wait to execute until all flow files are available, you may be able to use the Wait/Notify processors, such that all the flow files build up in front of a Wait processor before being released to the DetectDuplicate processor.

1
votes

The reason why only one message gets consumed is when you have the CRON scheduling enabled in all the processors - source and consuming/dowstream processors, it executes like this:

Ex: You have set up a CRON schedule in all processors to run on every day 2 PM, so during the trigger it will consume one flowfile from its upstream processor ex: GetFile at 2PM and the rest of the flowfiles will be on the queue and the next flowfile will only be consumed on the next day at 2PM and so on. And this applies to the further downstream processors, meaning, they will also consume only flowfile at a time at everyday 2 PM which is essentially a disaster in the making. Who wants the processing to be in snail's pace?

That's why you have to follow the approach @Bryan had mentioned. The flow pipeline should only have its source processor as CRON driven, the rest of the processors should be Timer driven with a run schedule of your wish, but generally 0 sec is used to consume the flowfile as it comes.