2
votes

The flat file has the following data without a header which needs to be loaded into the MySQL table.

101,AAA,1000,10
102,BBB,5000,20

I use GetFile or GetSFTP processor to read the data. Once the data is read, the flow file contains the above data. I want to only load the 1st column, 2nd column, and 4th column into the MySQL table. The output I expect in MySQL table is as below.

101,AAA,10
102,BBB,20

Can you please help me with how to extract only a few columns from an incoming flow file in nifi and load it into MySQL?

3

3 Answers

1
votes

This is just one way to do it, but there are several other ways. This method uses Records, and otherwise avoids modifying the underlying data - it simply ignores the fields you don't want during the insert. This is beneficial when integrating with a larger Flow, where the data is used by other Processors that might expect the original data, or where you are already using Records.

Let's say your Table has the columns

id | name | value

and your data looks like

101,AAA,1000,10
102,BBB,5000,20

You could use a PutDatabaseRecord processor with Unmatched Field Behavior and Unmatched Column Behavior set to Ignore Unmatched... and add a CSVReader as the Record Reader.

In the CSVReader you could set the Schema Access Strategy to Use 'Schema Text' Property. Then set the Schema Text property to the following:

{
     "type": "record",
     "namespace": "nifi",
     "name": "db",
     "fields": [
       { "name": "id", "type": "string" },
       { "name": "name", "type": "string" },
       { "name": "ignoredField", "type": "string" },
       { "name": "value", "type": "string" }
     ]
} 

This would match the NiFi Record fields against the DB Table columns, which would match fields 1,2 and 4 while ignoring fields 3 (as it did not match a column name).

Obviously, amend the field names in the Schema Text schema to match the Column names of your DB Table. You can also do data types checking/conversion here.

PutDatabaseRecord

CSVReader

1
votes

Another method could be to use convert your flowfile to a record, with the help of ConvertRecord. It helps transforming to an CSV format to whatever you prefer, you can still keep CSV format.

But with your flowfile beeing a record you can now use additionnal processors like: QueryRecord, so you can run SQL like command on the flow file:

"SELECT * FROM FLOWFILE"

and in your case, you can do :

"SELECT col1,col2,col3 FROM FLOWFILE"

you can also directly apply filtering :

"SELECT col1,col2,col3 FROM FLOWFILE WHERE col1>500"

I recommand you the following reading:

0
votes

Thank you very much pdeuxa and Sdairs for your reply. your inputs were helpful. I have tried to use a similar method as both of you did. I used convertRecord and configured CSVRecordReader and CSVSetRecordWriter. CSVRecordReader has the following schema to read the data

{
     "type": "record",
     "namespace": "nifi",
     "name": "db",
     "fields": [
       { "name": "id", "type": "string" },
       { "name": "name", "type": "string" },
       { "name": "Salary", "type": "string" },
       { "name": "dept", "type": "string" }
     ]
} 

while the CSVSetRecordWriter has the following output schema. There are 4 fields in Input schema while the output Schema only has 3 columns.

{
     "type": "record",
     "namespace": "nifi",
     "name": "db",
     "fields": [
       { "name": "id", "type": "string" },
       { "name": "name", "type": "string" },
       { "name": "dept", "type": "string" }
     ]
} 

I was able to successfully run this. Thanks for your input guys.