I am using apache nifi to store logs in kafka.
I have some lines of log, that depending on the content I have to send them to a topic kafka or other. The problem I have is that there are many topics and therefore I would have to use many processors.
I have thought that using 'executescript', I could, according to the conditions I have in the logs text, generate a dynamic attribute that I can use in the 'topic name' property of the publishkafka processor.
I started with the code to read the content of the flowfile, as well as to write some of the conditions, but I don't know how to generate the attribute.
Some examples of the log lines: Jim,18,M,156,Oregon,USA,etc John,55,M,170,Idaho,USA,etc
Here's what I have so far:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
Log = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
TextLog = str(Log).split(',')
name = TextLog[0]
age = TextLog[1]
sex = TextLog[2]
if name == 'John' and age == '30':
Topic_A = str(TextLog)
outputStream.write(bytearray((Topic_A).encode('utf-8')))
elif name == 'Max' and age == '25':
Topic_B = str(TextLog)
outputStream.write(bytearray((Topic_B).encode('utf-8')))
elif name == 'Smith' and age == '10' or '20':
Topic_C = str(TextLog)
outputStream.write(bytearray((Topic_C).encode('utf-8')))
The goal is to have a single executescript processor and a single kafka processor. Please, someone can help me.