0
votes

I've a flowfile and a attribute (which is list in datatype). I'm trying to duplicate the flowfile for each element in attribute.

This is how it goes- attribute (tia_clients: ['A', 'B', 'C']) has a list of elements. For every element in a list, I'm trying to duplicate the flowfile. Which means, generate a flowfile with attribute 'A', and Flowfile with Attribute 'B' and so on...

Here's the code, I've put together. Generating flowfiles equal to the size of attribute (list). however, flowfile is 0 bytes (not getting generated).

What is that I'm missing?

import sys
sys.path.append('/usr/lib/python2.7/site-packages')
from picoredis import Redis
reload(sys)
sys.setdefaultencoding('utf8')
import ast

import sys
import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback, StreamCallback


# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        self.charset = StandardCharsets.UTF_8
        self.parentFlowFile = None
    
    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        outputStream.write(text.encode('utf-8'))
        
#Get the flow file
parentFlowFile = session.get()

#check if the flowfile is not blank
if(parentFlowFile != None):
    #Enclose the code in try block to catch the thrown exceptions
    try:
        flowfiles_list = []
        #Call the class which does the parsing
        splitCallback = PyStreamCallback()
        splitCallback.parentFlowFile = parentFlowFile
        clients = parentFlowFile.getAttribute('tia_clients')
        clients = ast.literal_eval(clients)
                            
        for client in clients:
            flowFile = session.create(parentFlowFile)
            if (flowFile != None):
                try:
                    flowFile = session.write(flowFile, splitCallback)
                    flowFile = session.putAttribute(flowFile, unicode('client').encode('utf-8'), unicode(client).encode('utf-8'))
                    flowfiles_list.append(flowFile)
                  
                except Exception as e:
                    session.remove(flowFile)
                    raise

        for flow in flowfiles_list:
            session.transfer(flow, REL_SUCCESS)
             
    #The except block to catch the exceptions thrown by the try block
    except Exception as e:
        parentFlowFile = session.putAttribute(parentFlowFile, unicode('Error').encode('utf-8'), unicode(e).encode('utf-8'))
        tb = sys.exc_info()[2]
        parentFlowFile = session.putAttribute(parentFlowFile, unicode('Error line number').encode('utf-8'), unicode(tb.tb_lineno).encode('utf-8')) 
        #if there are exceptions thrown, the flow file will be moved to the failure queue
        session.transfer(parentFlowFile, REL_FAILURE)
        
session.remove(parentFlowFile)