I've a flowfile and a attribute (which is list in datatype). I'm trying to duplicate the flowfile for each element in attribute.
This is how it goes- attribute (tia_clients: ['A', 'B', 'C']) has a list of elements. For every element in a list, I'm trying to duplicate the flowfile. Which means, generate a flowfile with attribute 'A', and Flowfile with Attribute 'B' and so on...
Here's the code, I've put together. Generating flowfiles equal to the size of attribute (list). however, flowfile is 0 bytes (not getting generated).
What is that I'm missing?
import sys
sys.path.append('/usr/lib/python2.7/site-packages')
from picoredis import Redis
reload(sys)
sys.setdefaultencoding('utf8')
import ast
import sys
import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback, StreamCallback
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
self.charset = StandardCharsets.UTF_8
self.parentFlowFile = None
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(text.encode('utf-8'))
#Get the flow file
parentFlowFile = session.get()
#check if the flowfile is not blank
if(parentFlowFile != None):
#Enclose the code in try block to catch the thrown exceptions
try:
flowfiles_list = []
#Call the class which does the parsing
splitCallback = PyStreamCallback()
splitCallback.parentFlowFile = parentFlowFile
clients = parentFlowFile.getAttribute('tia_clients')
clients = ast.literal_eval(clients)
for client in clients:
flowFile = session.create(parentFlowFile)
if (flowFile != None):
try:
flowFile = session.write(flowFile, splitCallback)
flowFile = session.putAttribute(flowFile, unicode('client').encode('utf-8'), unicode(client).encode('utf-8'))
flowfiles_list.append(flowFile)
except Exception as e:
session.remove(flowFile)
raise
for flow in flowfiles_list:
session.transfer(flow, REL_SUCCESS)
#The except block to catch the exceptions thrown by the try block
except Exception as e:
parentFlowFile = session.putAttribute(parentFlowFile, unicode('Error').encode('utf-8'), unicode(e).encode('utf-8'))
tb = sys.exc_info()[2]
parentFlowFile = session.putAttribute(parentFlowFile, unicode('Error line number').encode('utf-8'), unicode(tb.tb_lineno).encode('utf-8'))
#if there are exceptions thrown, the flow file will be moved to the failure queue
session.transfer(parentFlowFile, REL_FAILURE)
session.remove(parentFlowFile)