4
votes

I am dealing with json objects containing geo coordinate points. I would like to run these points against a postgis server I have locally to assess point in polygon matching.

I'm hoping to do this with preexisting processors - I am successfully extracting the lat/lon coordinates into attributes with an "EvaluateJsonPath" processor, and successfully issuing queries to my local postgis datastore with "ExecuteSQL". This leaves me with avro responses, which I can then convert to JSON with the "ConvertAvroToJSON" processor.

I'm having conceptual trouble with how to merge the results of the query back together with the original JSON object. As it is, I've got two flow files with the same fragment ID, which I could theoretically merge together with "mergecontent", but that gets me:

{"my":"original json", "coordinates":[47.38, 179.22]}{"polygon_match":"a123"}

Are there any suggested strategies for merging the results of the SQL query into the original json structure, so my result would be something like this instead:

{"my":"original json", "coordinates":[47.38, 179.22], "polygon_match":"a123"}

I am running nifi 6.0, postgres 9.5.2, and postgis 2.2.1.

I saw some reference to using replaceText processor in https://community.hortonworks.com/questions/22090/issue-merging-content-in-nifi.html - but this seems to be merging content from an attribute into the body of the content. I'm missing the point of merging the content of the original and either the content of the SQL response, or attributes extracted from the SQL response without the content.

Edit:

Groovy script following appears to do what is needed. I am not a groovy coder, so any improvements are welcome.

import org.apache.commons.io.IOUtils
import java.nio.charset.*
import groovy.json.JsonSlurper

def flowFile = session.get();
if (flowFile == null) {
    return;
}
def slurper = new JsonSlurper()

flowFile = session.write(flowFile,
    { inputStream, outputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = slurper.parseText(text)
        def originaljsontext = flowFile.getAttribute('original.json')
        def originaljson = slurper.parseText(originaljsontext)
        originaljson.put("point_polygon_info", obj)
        outputStream.write(groovy.json.JsonOutput.toJson(originaljson).getBytes(StandardCharsets.UTF_8))
    } as StreamCallback)
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
1
Put a watch on issues.apache.org/jira/browse/NIFI-361 , it should help with a standard way to express JSON transformations. - andrew
Awesome, thanks @andrew - this will be broadly useful as a lot of what I do with nifi is json tranformation! - Josh Harrison

1 Answers

4
votes

If your original JSON is relatively small, a possible approach might be the following...

  • Use ExtractText before getting to ExecuteSQL to copy the original JSON into an attribute.
  • After ExecuteSQL, and after ConvertAvroToJSON, use an ExecuteScript processor to create a new JSON document that combines the original from the attribute with the results in the content.

I'm not exactly sure what needs to be done in the script, but I know others have had success using Groovy and JsonSlurper through the ExecuteScript processor.