2
votes

Using the replace text processor to replace a few columns in a 44 column file with comma(,) as a delimiter.

one flow file has just one row with 44 fields.

In the replace text processor,

enter image description here

I need to change the 3rd column in the flow file with an attribute. So I split it into 4 groups and replacing the data on the 2nd group alone with an attribute.

The processor hangs if I do this. How can i replace nth column with a specific attribute or a string?

2

2 Answers

3
votes

What version of NiFi are you using? As of NiFi 1.3.0, you can use the "record-aware" processors such as UpdateRecord. You would configure a CSVReader, possibly by inferring string fields from the header line or providing your own Avro schema for the fields. Let's say the name of the desired column/field is "fname". In UpdateRecord you can set the Replacement Strategy to "Literal" and add a user-defined property called "/fname" with a value of "${filename}". This should allow you to update the CSV file in-place, without having to split lines or deal with regular expressions to parse the row.

2
votes

Note: If using a version of Apache NiFi 1.3.0+, Matt's way is better

My recommendation would be to use an ExecuteScript processor and use Groovy to do this. I believe you could eventually craft a regular expression which would match what you are looking for, but as you note, the performance is not going to be good, and if a larger flowfile came in, you may crash the heap.

In Groovy (or Python/Ruby/etc.), this would be a simple string replacement operation like so:

import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
    def elements = IOUtils.toString(inputStream, StandardCharsets.UTF_8).split(",")
    // Rather than hardcoding, you could make the column index also read from a flowfile attribute to make this more generic
    elements[2] = flowfile.getAttribute("myAttributeName")
    def outputString = elements.join(",")

   outputStream.write(outputString.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)