2
votes

I am using apache nifi to store logs in kafka.

I have some lines of log, that depending on the content I have to send them to a topic kafka or other. The problem I have is that there are many topics and therefore I would have to use many processors.

I have thought that using 'executescript', I could, according to the conditions I have in the logs text, generate a dynamic attribute that I can use in the 'topic name' property of the publishkafka processor.

I started with the code to read the content of the flowfile, as well as to write some of the conditions, but I don't know how to generate the attribute.

Some examples of the log lines: Jim,18,M,156,Oregon,USA,etc John,55,M,170,Idaho,USA,etc

Here's what I have so far:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
    def __init__(self):
         pass
    def process(self, inputStream, outputStream):
         Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
         TextLog = str(Log).split(',')
         name = TextLog[0]
         age = TextLog[1]
         sex = TextLog[2]

         if name == 'John' and age == '30':
             Topic_A = str(TextLog) 
             outputStream.write(bytearray((Topic_A).encode('utf-8')))
         elif name == 'Max' and age == '25':
             Topic_B = str(TextLog)
             outputStream.write(bytearray((Topic_B).encode('utf-8')))
         elif name == 'Smith' and age == '10' or '20':
             Topic_C = str(TextLog)
             outputStream.write(bytearray((Topic_C).encode('utf-8')))

The goal is to have a single executescript processor and a single kafka processor. Please, someone can help me.

1

1 Answers

5
votes

The short answer is:

flowFile = session.putAttribute(flowFile, 'my-property', 'my-value')

https://funnifi.blogspot.com/2016/02/executescript-processor-hello-world.html

A typical way this would be done without writing any custom code, is to use RouteOnContent...

You can add a user-defined properties where the name will become a relationship and the value is a regular expression.

For example, adding two properties:

john = John,30,.*
max = Max,25,.*

From there you would send each relationship to an UpdateAttribute processor where you set the topic name, so john would go to an UpdateAttribute that set topic = Topic_A and max would go to an UpdateAttribute that set topic = Topic_B.

Then they would all connect to a single PublishKafka where the topic was set to ${topic}.