2
votes

I have a csv with 70 columns. The 60th column contains a value which decides wether the record is valid or invalid. If the 60th column has 0, 1, 6 or 7 it's valid. If it contains any other value then its invalid.

I realised that this functionality wasn't possible relying completely on changing property's of processors in Apache NiFi. Therfore I decided to use the executeScript processor and added this python code as the text body.

import csv

valid =0
invalid =0
total =0
file2 = open("invalid.csv","w")
file1 = open("valid.csv","w")

with  open('/Users/himsaragallage/Desktop/redder/Regexo_2019101812750.dat.csv') as f:
    r = csv.reader(f)
    for row in f:
        # print row[1]
        total +=1

        if row[59] == "0" or row[59] == "1" or row[59] == "6" or row[59] == "7":
            valid +=1
            file1.write(row)
        else:
            invalid += 1
            file2.write(row)
file1.close()
file2.close()
print("Total : " + str(total))
print("Valid : " + str(valid))
print("Invalid : " + str(invalid))

I have no idea how to use a session and code within the executeScript processor as shown in this question. So I just wrote a simple python code and directed the valid and invalid data to different files. This approach I have used has many limitations.

  1. I want to be able to dynamically process csv's with different filenames.
  2. The csv which the invalid data is sent to, must also have the same filename as the input csv.
  3. There would be around 20 csv's in my redder folder. All of them must be processed in one go.

Hope you could suggest a method for me to do the following. Feel free to provide me with a solution by editing the python code I have used or even completely using a different set of processors and totally excluding the use of ExecuteScript Processer

2
You can look into QueryRecord processor instead doing this in Jython script. With that processor, you will be simply able to write a new relationship which says "select * from FLOWFILE where column60 in (0,1,6,7) " - Pushkr
@Pushkr Can you be clear as to what property/configuration must be changed to "select * from FLOWFILE where column60 in (0,1,6,7) " within the QueryRecord Processer. - Himsara Gallege

2 Answers

2
votes

Here is complete step-by-step instructions on how to use QueryRecord processor

Basically, you need to setup highlighted properties

enter image description here

1
votes

You want to route records based on values from one column. There are various ways to make this happen in NiFi. I can think of the following:

I show you how to solve your problem using PartitionRecord processor. Since you did not provide any example data I created an example use case. I want to distinguish cities in Europe from cities elsewhere. Following data is given:

id,city,country
1,Berlin,Germany
2,Paris,France
3,New York,USA
4,Frankfurt,Germany

Flow:

enter image description here

GenerateFlowFile:

enter image description here

PartitionRecord:

enter image description here

CSVReader should be setup to infer schema and CSVRecordSetWriter to inherit schema. PartitionRecord will group records by country and pass them on together with an attribute country that has the country value. You will see following groups of records:

id,city,country
1,Berlin,Germany
4,Frankfurt,Germany

id,city,country
2,Paris,France

id,city,country
3,New York,USA

Each group is a flowfile and will have the country attribute, which you will use to route the groups.

RouteOnAttribute:

enter image description here

All countries from Europe will be routed to the is_europe relationship. Now you can apply the same strategy to your use case.