0
votes

I am using NiFi 1.11.4 to build a data pipeline where IoT device is sending data in JSON format. Each time I receive data from IoT device, I receive two JSONs;

JSON_INITIAL

{
   devId: "abc",
   devValue: "TWOINITIALCHARS23",
}

and JSON_FINAL

{
   devId: "abc",
   devValue: "TWOINITIALCHARS45",
}

There is a time difference of a few milli seconds with which I receive these two flow files. In my usecase, I need to merge this JSON in such a way that my resultant JSON looks like below (please note removal of TWOINITIALCHARS in both cases;

JSON_RESULT_AFTER_MERGE

{
   devId: "abc",
   devValue: "2345",
}

Is this something NiFi should be dealing with? If yes, would really appreciate an approach to design relevant flow for this use case.

1

1 Answers

0
votes

Assuming the devId is static for a device and not used for the correlation (i.e. abc for all messages coming from this device, not abc for the first two and then def for the next two, etc.), you have a few options:

  1. Use MergeContent to concatenate the flowfile contents (the two JSON blocks) and ReplaceText to modify the combined content to match the desired output. This will require tuning the MC binning properties to limit the merge window to 1-2 seconds (difficult/insufficient if you're receiving multiple messages per second, for example) and using regular expressions to remove the duplicate content.
  2. Use a custom script to interact with the device JSON output (Groovy for example will make the JSON interaction pretty simple)
    • If you do this within the context of NiFi (via ExecuteScript or InvokeScriptedProcessor), you will have access to the NiFi framework, so you can evaluate flowfile attributes and content, making this easier (there will be attributes for initial timestamp, etc.).
    • If you do this outside the context of NiFi (via ExecuteProcess or ExecuteStreamCommand), you won't have access to the NiFi framework (attributes, etc.) but you may have better interaction with the device directly.