6
votes

I have a few Nifi process groups which I want to run integration tests on before promoting to production. The issue is that I can't seem to find any documentation on how to do so.

Data Provenance seems like a promising tool to accomplish what I want, however, over the course of the flowfile's lifecycle, data is published to/from kafka or the file system. As a result, the flowfile UUID changes so I cannot query for it using the nifi-api.

Additionally, I know that Nifi offers a TestRunner library to run tests, however, this seems to only be for processors/processor groups generated via code and not the UI.

Does anyone know of a tool, framework, or pattern for integration and unit testing nifi process groups. Ideally this would be a solution where you can programatically compare input/output of the processor/processor group without modifying the existing workflow.

2

2 Answers

2
votes

With the introduction of the Apache NiFi Registry, we have seen users promote flows from a development/sandbox environment to a test/QE environment where there are existing "test harness" flows surrounding the "flow under test" so that they can send repeatable and deterministic (or an anonymized sample of real production data) through the flow and compare the results to an expected value.

As you point out, there is a TestRunner class and a whole testing framework provided for unit tests. While it can be difficult to manually translate a UI-constructed flow to the programmatic construction, you could also create something like a translator to accept a flow template or flow.xml.gz file and convert it into something processable by the test framework.

1
votes

Maybe plumber will help you with flow testing.

We also wanted to test whole NiFi flows, not just single processor, so we created this library and decided to open-source it. Simple example in Scala:

    // read flow previously exported from NiFi
    val template = TemplateDeserializer.deserialize(this.getClass.getClassLoader.getResourceAsStream("exported-flow.xml"))
    val flow = NifiTemplateFlowFactory(template).create()
    // enqueue some data to any processor
    flow.enqueueByName("csv row,12,another value,true", "CsvParserProcessor")

    // run entire flow once
    flow.run(1)

    // get the results from any processor
    val records = flow.resultsFromProcessorRelation("LastProcessorInFlow","successRelation")
    records should have size 1

This library is still under development so improvements and ideas are welcomed! :)