5
votes

i want to merge two flowfile that contain JSON object by same specified attribute...

flow1:

attribute:    
xuuid = 123456

content:
{
"sname":"jack",
"id":"00001",
"state":"NY"
}

flow2:

attribute:    
xuuid = 123456

content:
{
"country":"US",
"date":"1983"
}

and i expect this form of data in single output flow:

desired_flow:

attribute:    
xuuid = 123456

content:
{
"sname":"jack",
"id":"00001",
"state":"NY",
"country":"US",
"date":"1983"
}

how do i play with this? MergeContent processor or MergeRecord? i think mergerecord can handle it but i confusing on it.

3

3 Answers

5
votes

Yes MergeContent can do this for you.

I use EvalJson --> MergeContent --> AttributesToJson

I have posted a template here you can use to play around. Apache NiFi Merge Json Template

The MergeContent must have this settings : "Keep all attributes","2 a num of entires" ,"Delimiters strategy is Text"

4
votes

What you are asking for is a streaming join and it is not something that NiFi really does, similar question and answer here:

https://stackoverflow.com/a/42909221/5650316

The merge processors are made to merge pieces of data one after another, not to perform a streaming join. For example, if you have many small json messages you would want to use MergeContent or MergeRecord to merge together thousands of them into a single flow file before writing to HDFS.

0
votes

An answer to another question shows how this can be done with MergeContent followed by a JoltTransformJSON.

Like the OP here, I wanted to merge on a particular attribute (filename, in my case) so my MergeContent config was slightly different:

Merge Strategy: Bin-Packing Algorithm
Merge Format: Binary Concatenation
Correlation Attribute Name: filename  # or xuuid, or whatever you want
Minimum Number of Entries: 2
Delimiter Strategy: Text
Header: [
Footer: ]
Demarcator: ,

After that, the second part of the solution is the same:

Then transfer to JoltTrasnformJSON and set Jolt Transformation DSL to Shift and Jolt Specification to:

{
  "*": {
    "*": "&"
  }
}

This should do the job :)

Smashing solution, all kudos to @Ben Yaakobi.

The only thing I can add, by way of explanation, is that @Bryan Bende's answer is technically true, i.e. NiFi isn't designed for this kind of thing. Accordingly, the answer above is a bit of a hack:

  • In the first part, MergeContent actually ignores the fact we're working with JSON altogether (its Binary Concatenation means it's just dealing with the content as raw bytes). It just "fakes" merging the two records into a JSON array by using Header, Footer and Decmarcator settings as shown, which happen to be JSON syntax.
  • Then in the second part, Jolt is able to parse that munged text as valid JSON, and apply its transformational magic.

For a better understanding of the Jolt syntax used, here are some helpful resources on the topic:

See also some alternative approaches mentioned here. In particular, I think the approach of using Use MergeRecord / MergeContent with a correlation attribute or Defragment mode, followed by QueryRecord with COALESCE and GROUP BY in order to join together the columns from both datasets, would be most relevant to this question (although I haven't tried this myself).