I am having the input payload {"ENV": {"MAC": "6CECEB5D0302","NS": "NM","DATE": "170619","TIME": "114635","PM2.5": "10","PM10": "8.272681196722012"}}.
I have used the below script for creating multiple flowfiles and it worked fine. But same code not working for transferring one flowfile. In the below code based on the if code I wanted the script should either transfer flow to next processor or it should just log info and drop that flowfile.
import groovy.json.*;
import org.apache.commons.io.IOUtils;
import java.nio.charset.*;
import java.lang.*;
def flowFile = session.get();
if (flowFile == null) {
return;
}
def flowFiles = [] as List<FlowFile>
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
try{
def data = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
def input = new JsonSlurper().parseText( data );
if( Double.parseDouble(input.ENV["PM2.5"])>7.2) {
Map out =
[ENVIRONMENT: [
DATE: input.ENV.DATE,
"PM2.5": input.ENV["PM2.5"],
TIME: input.ENV.TIME,
DATE_TIME: input.ENV.DATE,
NS: input.ENV.NS,
MAC: input.ENV.MAC
]];
String finalJson = JsonOutput.toJson out;
outputStream.write(finalJson.getBytes(StandardCharsets.UTF_8))
flowFiles << flowFile
} else {
log.info('else condition executed')
}
} catch(Exception e){}
} as StreamCallback)
session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)
Expected output:
{"ENVIRONMENT":{"DATE":"170619","PM2.5":"10","TIME":"114635","DATE_TIME":"170619","NS":"NM","MAC":"6CECEB5D0302"}}
But if the input is having PM2.5 less than 7.2, it should drop that flowfile.
After removing empty try and catch exception log is:
2019-10-07 12:08:29,237 ERROR [Timer-Driven Process Thread-5] o.a.nifi.processors.script.ExecuteScript ExecuteScript[id=716b08ff-7a78-3806-bdd6-99d57cfd7cb7] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer: org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:248)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.script.ScriptException: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:162)
at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264)
at org.apache.nifi.script.impl.GroovyScriptEngineConfigurator.eval(GroovyScriptEngineConfigurator.java:54)
at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:232)
... 11 common frames omitted
Caused by: javax.script.ScriptException: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:324)
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:159)
... 14 common frames omitted
Caused by: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=c8217e1d-ec25-4017-a175-25ef12ad64ba,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1570308702876-1, container=default, section=1], offset=53731, length=114],offset=0,name=c8217e1d-ec25-4017-a175-25ef12ad64ba,size=114] is already marked for transfer
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3139)
at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:3121)
at org.apache.nifi.controller.repository.StandardProcessSession.remove(StandardProcessSession.java:1979)
at org.apache.nifi.processor.ProcessSession$remove$3.call(Unknown Source)
at Script107.run(Script107.groovy:40)
at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:321)
... 15 common frames omitted
Same code I used for generating one flowfile with multiple output this is working in both cases 1)if it satisfies condition it will generate the multiple output based on the input. 2)it'll drop if it not satisfies.:
Input:
{
"ENV": [{
"MAC": "6CECEB5D0302",
"NS": "NM",
"DATE": "170619",
"TIME": "114635",
"PM2.5": "7.222410585417936",
"PM10": "8.272681196722012"
},
{
"MAC": "6CECEB5D0302",
"NS": "NM",
"DATE": "170619",
"TIME": "114635",
"PM2.5": "7.222410585417936",
"PM10": "8.272681196722012"
}
]
}
The above input will generate two flowfile as 7.22... is greater than 7.2. If the value for PM2.5 is less than 7.2 then it won't generate any transfer flowfile for next processor I mean it'll drop there itself.
import groovy.json.*;
import org.apache.commons.io.IOUtils;
import java.nio.charset.*;
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def inputStream = session.read(flowFile)
def writer = new StringWriter();
try {
IOUtils.copy(inputStream, writer, "UTF-8");
data=writer.toString();
inputStream.close();
def input = new JsonSlurper().parseText( data );
for (def i=0;i<input.ENV.size();i++){
if( Double.parseDouble(input.ENV[i]["PM2.5"])>7.2) {
def newFlowFile = session.create(flowFile);
Map out =
[ENVIRONMENT: [
DATE: input.ENV[i].DATE,
"PM2.5": input.ENV[i]["PM2.5"],
TIME: input.ENV[i].TIME,
DATE_TIME: input.ENV[i].DATE,
NS: input.ENV[i].NS,
MAC: input.ENV[i].MAC
]];String finalJson = JsonOutput.toJson out;
newFlowFile = session.write(newFlowFile, { outputStream ->
outputStream.write( finalJson.getBytes(StandardCharsets.UTF_8) )
} as OutputStreamCallback)
flowFiles << newFlowFile
}
else{
}
}} catch (IOException e) {
}
session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)
1.you read incoming flow file intoflowFile.2.then you write something into incoming flow fileflowFile = session.write(flowFile....3.then you add incoming flow file into an arrayflowFiles << flowFile.4.then you transfer incoming flow file stored in array to success5.then you try to remove the same incoming flow file.... so you got the errorflow file is already marked for transfer. could you explain what you a trying to do. what should be the result for your input... - daggett