0
votes

I am having the input payload {"ENV": {"MAC": "6CECEB5D0302","NS": "NM","DATE": "170619","TIME": "114635","PM2.5": "10","PM10": "8.272681196722012"}}.

I have used the below script for creating multiple flowfiles and it worked fine. But same code not working for transferring one flowfile. In the below code based on the if code I wanted the script should either transfer flow to next processor or it should just log info and drop that flowfile.

import groovy.json.*;
import org.apache.commons.io.IOUtils;
import java.nio.charset.*;
import java.lang.*;

def flowFile = session.get();
if (flowFile == null) {
    return;
}
def flowFiles = [] as List<FlowFile>
flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        try{
            def data = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
            def input = new JsonSlurper().parseText( data );
            if( Double.parseDouble(input.ENV["PM2.5"])>7.2) {
                Map out =
                    [ENVIRONMENT: [
                      DATE: input.ENV.DATE,
                      "PM2.5": input.ENV["PM2.5"],
                      TIME: input.ENV.TIME,
                      DATE_TIME: input.ENV.DATE,
                      NS: input.ENV.NS,
                      MAC: input.ENV.MAC
                    ]];
                String finalJson = JsonOutput.toJson out;
                outputStream.write(finalJson.getBytes(StandardCharsets.UTF_8))
                flowFiles << flowFile
            } else {
                log.info('else condition executed')
            }
        } catch(Exception e){}
    } as StreamCallback)
session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)

Expected output:

{"ENVIRONMENT":{"DATE":"170619","PM2.5":"10","TIME":"114635","DATE_TIME":"170619","NS":"NM","MAC":"6CECEB5D0302"}}

But if the input is having PM2.5 less than 7.2, it should drop that flowfile.

After removing empty try and catch exception log is:

2019-10-07 12:08:29,237 ERROR [Timer-Driven Process Thread-5] o.a.nifi.processors.script.ExecuteScript ExecuteScript[id=716b08ff-7a78-3806-bdd6-99d57cfd7cb7] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer: org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
    at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:248)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:162)
    at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264)
    at org.apache.nifi.script.impl.GroovyScriptEngineConfigurator.eval(GroovyScriptEngineConfigurator.java:54)
    at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:232)
    ... 11 common frames omitted
Caused by: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:324)
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:159)
    ... 14 common frames omitted
Caused by: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
    at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3139)
    at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3121)
    at org.apache.nifi.controller.repository.StandardProcessSession.remove(StandardProcessSession.java:1979)
    at org.apache.nifi.processor.ProcessSession$remove$3.call(Unknown Source)
    at Script107.run(Script107.groovy:40)
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:321)
    ... 15 common frames omitted

Same code I used for generating one flowfile with multiple output this is working in both cases 1)if it satisfies condition it will generate the multiple output based on the input. 2)it'll drop if it not satisfies.:

Input:

 {
    "ENV": [{
            "MAC": "6CECEB5D0302",
            "NS": "NM",
            "DATE": "170619",
            "TIME": "114635",
            "PM2.5": "7.222410585417936",
            "PM10": "8.272681196722012"
        },
        {
            "MAC": "6CECEB5D0302",
            "NS": "NM",
            "DATE": "170619",
            "TIME": "114635",
            "PM2.5": "7.222410585417936",
            "PM10": "8.272681196722012"
        }
    ]
}

The above input will generate two flowfile as 7.22... is greater than 7.2. If the value for PM2.5 is less than 7.2 then it won't generate any transfer flowfile for next processor I mean it'll drop there itself.

import groovy.json.*;
import org.apache.commons.io.IOUtils;
import java.nio.charset.*;



def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def inputStream = session.read(flowFile)

def writer = new StringWriter();
    try {
            IOUtils.copy(inputStream, writer, "UTF-8");
            data=writer.toString();
            inputStream.close();
            def input = new JsonSlurper().parseText( data );
            for (def i=0;i<input.ENV.size();i++){
            if( Double.parseDouble(input.ENV[i]["PM2.5"])>7.2) {
            def newFlowFile = session.create(flowFile);
Map out =
[ENVIRONMENT: [
  DATE: input.ENV[i].DATE,
  "PM2.5": input.ENV[i]["PM2.5"],
  TIME: input.ENV[i].TIME,
  DATE_TIME: input.ENV[i].DATE,
  NS: input.ENV[i].NS,
  MAC: input.ENV[i].MAC
]];String finalJson = JsonOutput.toJson out;
            newFlowFile = session.write(newFlowFile, { outputStream -> 
                     outputStream.write( finalJson.getBytes(StandardCharsets.UTF_8) )
            } as OutputStreamCallback)
            flowFiles << newFlowFile
            }
            else{
            }
        }} catch (IOException e) {

        }
session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)
1
Remove the empty try-catch. And show the real error. - daggett
I have updated log in the question after removing the try-catch - ashok
your algorithm: 1. you read incoming flow file into flowFile. 2. then you write something into incoming flow file flowFile = session.write(flowFile.... 3. then you add incoming flow file into an array flowFiles << flowFile. 4. then you transfer incoming flow file stored in array to success 5. then you try to remove the same incoming flow file.... so you got the error flow file is already marked for transfer. could you explain what you a trying to do. what should be the result for your input... - daggett
I have a requirement of generating the groovyscript in java code. So, I am using same template of groovy code for generating one to one and one to many mapping(one to many flow file generation using loop). I just wanted if the condition in the if block is not satisfied it should drop the flowfile in else part and log the content same in case of both one to one and one many flowfile generation. I just want either two or one groovy script code/template which will work for both one to one and one to many flowfile generation with condtion. - ashok
please edit your question and provide example of input and expected multiple output - daggett

1 Answers

1
votes

The problem in your code - you are transferring to success and then removing the same flow file.

Only one of those operations could be applied to one file.

Seems you just forgot to create a new file

try to change this part of code:

String finalJson = JsonOutput.toJson out;
outputStream.write(finalJson.getBytes(StandardCharsets.UTF_8))
flowFiles << flowFile

to this:

def outFile = session.create(flowFile) //create new flow file
session.write().withWriter("UTF-8"){w-> new JsonBuilder(out).writeTo(w) }
flowFiles << outFile

You could simplify your code. The following example works with ExecuteGroovyScript processor

import groovy.json.*;

def flowFile = session.get();
if (flowFile == null)return;
//read and parse input file
def data = flowFile.read().withReader("UTF-8"){r-> new JsonSlurper().parse(r) }

data.ENV.each{item->
    if( (item."PM2.5" as Double) > 7.2  ){
        def outFile = flowFile.clone(false)     //clone flowfile without content
        def outData = [ENVIRONMENT: item]       //combine new json
        outFile.write("UTF-8"){w-> 
            new JsonBuilder(outData).writeTo(w) //write json as out file content
        }
        REL_SUCCESS << outFile                  //transfer to success
    }
}
flowFile.remove()