I am trying to explore how can we write unit test cases for Apache Nifi so that I can avoid "change code, build nar, paste nar in lib folder, restart nifi" loop. However, I guess, for this I also need to capture flow files to local disk and reload them each time I run unit test case. I came across this article which asks to serialize flow files to disk and then read these files and enqueue them to processor in unit test to feed them to my custom processor which I am currently developing. The article asks to use MergeContent with FlowFileV3 option and then use PutFile. I was able to persist these files in .pkg format. I am reading them back in my unit test code as suggested in the same article, using GetFile, IndetifyMimeType and UnpackContent processors. However I am doing this in code as below:
//Get File
TestRunner getFileRunner = TestRunners.newTestRunner(new GetFile());
getFileRunner.setProperty(GetFile.DIRECTORY, "C:\\Mahesh\\delete\\serialized-flow-file-2");
getFileRunner.setProperty(GetFile.KEEP_SOURCE_FILE, "true");
getFileRunner.run(1);
List<MockFlowFile> getFileResult = getFileRunner.getFlowFilesForRelationship(GetFile.REL_SUCCESS);
List<? extends FlowFile> getFileFFResult = getFileResult;
//IdentifyMimeType
TestRunner identifyMimeTypeRunner = TestRunners.newTestRunner(new IdentifyMimeType());
identifyMimeTypeRunner.enqueue(getFileFFResult.toArray(new FlowFile[getFileFFResult.size()]));
identifyMimeTypeRunner.run(1);
List<MockFlowFile> identifyMimeTypeResult = identifyMimeTypeRunner.getFlowFilesForRelationship(IdentifyMimeType.REL_SUCCESS);
List<? extends FlowFile> identifyMimeTypeFFResult = identifyMimeTypeResult;
//UnpackContent
TestRunner unpackContentRunner = TestRunners.newTestRunner(new UnpackContent());
unpackContentRunner.enqueue(identifyMimeTypeFFResult.toArray(new FlowFile[identifyMimeTypeFFResult.size()]));
unpackContentRunner.run(1);
List<MockFlowFile> unpackContentResult = unpackContentRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
List<? extends FlowFile> unpackContentFFResult = unpackContentResult;
However I am getting following exception:
17:39:36.676 [pool-1-thread-1] INFO org.apache.nifi.processors.standard.GetFile - GetFile[id=2e2161db-48a7-4a13-b7dd-ec75ce2b30dc] added FlowFile[0,618912147321300.pkg,556530B] to flow
17:40:08.772 [pool-2-thread-1] INFO org.apache.nifi.processors.standard.IdentifyMimeType - IdentifyMimeType[id=aefc3abe-0820-48a0-8935-e905aeadb191] Identified FlowFile[0,618912147321300.pkg,556530B] as having MIME Type application/flowfile-v3
17:40:48.625 [pool-3-thread-1] ERROR org.apache.nifi.processors.standard.UnpackContent - UnpackContent[id=6840bef5-4e52-48ac-be2f-1f9580eeb144] UnpackContent[id=6840bef5-4e52-48ac-be2f-1f9580eeb144] failed to process due to java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed; rolling back session: java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
17:40:48.630 [pool-3-thread-1] ERROR org.apache.nifi.processors.standard.UnpackContent -
java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
at org.apache.nifi.util.MockProcessSession.validateState(MockProcessSession.java:1014)
at org.apache.nifi.util.MockProcessSession.putAllAttributes(MockProcessSession.java:488)
at org.apache.nifi.util.MockProcessSession.inheritAttributes(MockProcessSession.java:1044)
at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:299)
at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:62)
at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker$1.process(UnpackContent.java:415)
at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:547)
at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:529)
at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker.unpack(UnpackContent.java:409)
at org.apache.nifi.processors.standard.UnpackContent.onTrigger(UnpackContent.java:255)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:251)
at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:245)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I have some doubts:
First, obviously, why I am getting following error:
FlowFile already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed?Am I doing it right way? Is my approach to save those flow files using
MergeContentandPutFileand then read them usingGetFile,IndentifyMimeTypeandUnpackContentcorrect? I was thinking to feed my feed the output ofUnpackContentto my custom processor'sTestRunner? Is this all correct? Or their is some other more preferrable / standard approach to this which I am simply missing?Will this approach preserve attributes of flow files (as said in the article) so that I can blindly enqueue them to my custom processor's test runner and it will run clean (if at all I succeed to fix above exception)?
Edit
While debugging, I went into some of those framework classes and then in eclipse debug shell, I did e.printStackTrace() and it printed this:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:89)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:41)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:541)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:763)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:463)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:209)
Caused by: java.lang.AssertionError: java.lang.IllegalStateException: FlowFile[0,622261873281800.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:201)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:160)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:155)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:150)
at MyCustomProcessorTest.testOnTrigger(MyCustomProcessorTest.java:47)
... 23 more
Caused by: java.lang.IllegalStateException: FlowFile[0,622261873281800.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
at org.apache.nifi.util.MockProcessSession.validateState(MockProcessSession.java:1014)
at org.apache.nifi.util.MockProcessSession.putAllAttributes(MockProcessSession.java:488)
at org.apache.nifi.util.MockProcessSession.inheritAttributes(MockProcessSession.java:1044)
at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:299)
at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:62)
at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker$1.process(UnpackContent.java:415)
at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:547)
at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:529)
at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker.unpack(UnpackContent.java:409)
at org.apache.nifi.processors.standard.UnpackContent.onTrigger(UnpackContent.java:255)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:251)
at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:245)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
where MyCustomProcessorTest.java:47 is unpackContentRunner.run(1).