1
votes

I am trying to explore how can we write unit test cases for Apache Nifi so that I can avoid "change code, build nar, paste nar in lib folder, restart nifi" loop. However, I guess, for this I also need to capture flow files to local disk and reload them each time I run unit test case. I came across this article which asks to serialize flow files to disk and then read these files and enqueue them to processor in unit test to feed them to my custom processor which I am currently developing. The article asks to use MergeContent with FlowFileV3 option and then use PutFile. I was able to persist these files in .pkg format. I am reading them back in my unit test code as suggested in the same article, using GetFile, IndetifyMimeType and UnpackContent processors. However I am doing this in code as below:

//Get File
TestRunner getFileRunner = TestRunners.newTestRunner(new GetFile());
getFileRunner.setProperty(GetFile.DIRECTORY, "C:\\Mahesh\\delete\\serialized-flow-file-2");
getFileRunner.setProperty(GetFile.KEEP_SOURCE_FILE, "true");
getFileRunner.run(1);
List<MockFlowFile> getFileResult = getFileRunner.getFlowFilesForRelationship(GetFile.REL_SUCCESS);

List<? extends FlowFile> getFileFFResult = getFileResult;

//IdentifyMimeType
TestRunner identifyMimeTypeRunner = TestRunners.newTestRunner(new IdentifyMimeType());
identifyMimeTypeRunner.enqueue(getFileFFResult.toArray(new FlowFile[getFileFFResult.size()]));
identifyMimeTypeRunner.run(1);
List<MockFlowFile> identifyMimeTypeResult = identifyMimeTypeRunner.getFlowFilesForRelationship(IdentifyMimeType.REL_SUCCESS);

List<? extends FlowFile> identifyMimeTypeFFResult = identifyMimeTypeResult;

//UnpackContent
TestRunner unpackContentRunner = TestRunners.newTestRunner(new UnpackContent());
unpackContentRunner.enqueue(identifyMimeTypeFFResult.toArray(new FlowFile[identifyMimeTypeFFResult.size()]));
unpackContentRunner.run(1);
List<MockFlowFile> unpackContentResult = unpackContentRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);

List<? extends FlowFile> unpackContentFFResult = unpackContentResult;

However I am getting following exception:

17:39:36.676 [pool-1-thread-1] INFO org.apache.nifi.processors.standard.GetFile - GetFile[id=2e2161db-48a7-4a13-b7dd-ec75ce2b30dc] added FlowFile[0,618912147321300.pkg,556530B] to flow
17:40:08.772 [pool-2-thread-1] INFO org.apache.nifi.processors.standard.IdentifyMimeType - IdentifyMimeType[id=aefc3abe-0820-48a0-8935-e905aeadb191] Identified FlowFile[0,618912147321300.pkg,556530B] as having MIME Type application/flowfile-v3
17:40:48.625 [pool-3-thread-1] ERROR org.apache.nifi.processors.standard.UnpackContent - UnpackContent[id=6840bef5-4e52-48ac-be2f-1f9580eeb144] UnpackContent[id=6840bef5-4e52-48ac-be2f-1f9580eeb144] failed to process due to java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed; rolling back session: java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
17:40:48.630 [pool-3-thread-1] ERROR org.apache.nifi.processors.standard.UnpackContent - 
java.lang.IllegalStateException: FlowFile[0,620665804273900.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
    at org.apache.nifi.util.MockProcessSession.validateState(MockProcessSession.java:1014)
    at org.apache.nifi.util.MockProcessSession.putAllAttributes(MockProcessSession.java:488)
    at org.apache.nifi.util.MockProcessSession.inheritAttributes(MockProcessSession.java:1044)
    at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:299)
    at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:62)
    at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker$1.process(UnpackContent.java:415)
    at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:547)
    at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:529)
    at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker.unpack(UnpackContent.java:409)
    at org.apache.nifi.processors.standard.UnpackContent.onTrigger(UnpackContent.java:255)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:251)
    at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:245)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I have some doubts:

  1. First, obviously, why I am getting following error: FlowFile already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed?

  2. Am I doing it right way? Is my approach to save those flow files using MergeContent and PutFile and then read them using GetFile, IndentifyMimeType and UnpackContent correct? I was thinking to feed my feed the output of UnpackContent to my custom processor's TestRunner? Is this all correct? Or their is some other more preferrable / standard approach to this which I am simply missing?

  3. Will this approach preserve attributes of flow files (as said in the article) so that I can blindly enqueue them to my custom processor's test runner and it will run clean (if at all I succeed to fix above exception)?

Edit

While debugging, I went into some of those framework classes and then in eclipse debug shell, I did e.printStackTrace() and it printed this:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:89)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:41)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:541)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:763)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:463)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:209)
Caused by: java.lang.AssertionError: java.lang.IllegalStateException: FlowFile[0,622261873281800.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
    at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:201)
    at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:160)
    at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:155)
    at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:150)
    at MyCustomProcessorTest.testOnTrigger(MyCustomProcessorTest.java:47)
    ... 23 more
Caused by: java.lang.IllegalStateException: FlowFile[0,622261873281800.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed
    at org.apache.nifi.util.MockProcessSession.validateState(MockProcessSession.java:1014)
    at org.apache.nifi.util.MockProcessSession.putAllAttributes(MockProcessSession.java:488)
    at org.apache.nifi.util.MockProcessSession.inheritAttributes(MockProcessSession.java:1044)
    at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:299)
    at org.apache.nifi.util.MockProcessSession.create(MockProcessSession.java:62)
    at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker$1.process(UnpackContent.java:415)
    at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:547)
    at org.apache.nifi.util.MockProcessSession.read(MockProcessSession.java:529)
    at org.apache.nifi.processors.standard.UnpackContent$FlowFileStreamUnpacker.unpack(UnpackContent.java:409)
    at org.apache.nifi.processors.standard.UnpackContent.onTrigger(UnpackContent.java:255)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:251)
    at org.apache.nifi.util.StandardProcessorTestRunner$RunProcessor.call(StandardProcessorTestRunner.java:245)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

where MyCustomProcessorTest.java:47 is unpackContentRunner.run(1).

1

1 Answers

5
votes

The mock framework is not really meant to write tests that chain together multiple processors. The mock framework is for unit testing an individual processor.

There are many different ways to setup a flow file using the mock framework.The content of the flow file can come from a file, a string, an input stream, or a byte array:

https://github.com/apache/nifi/blob/master/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java#L387-L453

An optional Map of attributes can be specified to setup the expected flow file attributes.

A common approach would be to setup files in src/test/resources for whatever data your custom processor expects and then call testRunner.enqueue(pathToTestFile).