0
votes

I have a Nifi Flow, which fetches a data from RDS tables and load into S3 as flat files, now i need to generate another file which will be having the name of the file that I am loading into S3 bucket, this needs to be a separate flow;

example: if the RDS extracted flat file name is RDS.txt, then the new generated file should have rds.txt as content and I need to load this file to same S3 bucket.

Problem I face is I am using a generate flowfile processor and adding the flat file name as custom text in flowfile, but i could not set up any upstream for Generate flow file processor, so this is generating more files, if I use the merge content processor after the generate flow file processor, I could see duplicate values in the flowfile.

Can anyone help me out in this

2
I think you need to refrase your question, i am not sure i get it. Q1 - who managed the RDS filename ? Q2 - why to use genereate flowfile ? is your trigger - Up_One
Agreed - not clear why you are using generate flow file - perhaps provide a screenshot of your flow? Sounds like it should just be QueryDB -> UpdateAttribute -> PutS3? - Sdairs
@Up_One, I will clarify the request; - Vasanth Kumar
@Up_One, I will clarify the request, I have two requests 1. Extract data from RDS load it into S3 as a text file 2. Generate a text file with, this text file will only have the name of the text file which we loaded into S3 as a textfile. Flow 1 is working perfectly, now for the second file I am using a Generate flow file which will have the name of the text file followed by Merge content, Put S3 object; The problem now I am facing is as the generate flow file generates more number of flow files, I am getting more files so my s3 is flooded with files. - Vasanth Kumar

2 Answers

0
votes

I have a Nifi Flow, which fetches a data from RDS tables and load into S3 as flat files, now i need to generate another file which will be having the name of the file that I am loading into S3 bucket, this needs to be a separate flow;

Easiest path to do this is to chain something after PutS3Object that will update the flowfile contents with what you want. It would be really simple to write with ExecuteScript. Something like this:

def ff = session.get()
if (ff) {
  def updated = session.write(ff, {
    it.write(ff.getAttribute("filename").bytes)
  } as OutputStreamCallback)
  updated = session.putAttribute(updated, "is_updated", "true")
  session.transfer(updated, REL_SUCCESS)
}

Then you can put a RouteOnAttribute after PutS3Object and have it route to either a null route if it detects the attribute is_updated or route back to PutS3Object if it's not been updated.

0
votes

I got a simple solution for this I have added a funnel before the put s3 object, and upstream of the funnel will receive two file, one with the extract and the other with the file name, down stream of the funnel is connected to the puts3 object, so this will load both the files at the same time