1
votes

I am using NiFi Flow as ListFile >> FetchFile >> SplitJson >> UpdateAttribute >> FlattenJson >> InferAvroSchema >> ConvertRecord >> MergeRecord >> PutParquet.

Json Input :

[{
       "Id": 1235,
        "Username": "fred1235",
        "Name": "Fred",
        "ShippingAddress": {
            "Address1": "456 Main St.",
            "Address2": "",
            "City": "Durham",
            "State": "NC"
        }

    },{

        "Id": 1236,
        "Username": "larry1234",
        "Name": "Larry",
        "ShippingAddress": {
            "Address1": "789 Main St.",
            "Address2": "",
            "City": "Durham",
            "State": "NC",
            "PostalCode": 277453
        },
        "Orders": [{
                "ItemId": 1111,
                "OrderDate": "11/11/2012"
            }, {
                "ItemId": 2222,
                "OrderDate": "12/12/2012"
        }]

}]

MergeRecord Processor not giving "Orders": array in merged file schema. Same issue with MergeContent processor.

1
Can you show the schema that is being generated by InferAvroSchema? and also, is the json input what is going into MergeRecord, or is that the example from the beginning of the flow? - Bryan Bende
{ "type" : "record", "name" : "jsonRecord", "fields" : [ { "name" : "Id", "type" : "int", "doc" : "Type inferred from '1236'" }, { "name" : "Username", "type" : "string", "doc" : "Type inferred from '\"larry1234\"'" }, { "name" : "Name", "type" : "string", "doc" : "Type inferred from '\"Larry\"'" }, { "name" : "ShippingAddress_Address1", "type" : "string", "doc" : "Type inferred from '\"789 Main St.\"'" }, { "name" : "ShippingAddress_Address2", "type" : "string", "doc" : "Type inferred from '\"\"'" }, { "name" : "ShippingAddress_City", "type" : "string", "doc" : "Type inferred from '\"Durham - Amit Kadam
\"'" }, { "name" : "ShippingAddress_State", "type" : "string", "doc" : "Type inferred from '\"NC\"'" }, { "name" : "ShippingAddress_PostalCode", "type" : "int", "doc" : "Type inferred from '277453'" }, { "name" : "Orders", "type" : { "type" : "array", "items" : { "type" : "record", "name" : "Orders", "fields" : [ { "name" : "ItemId", "type" : "int", "doc" : "Type inferred from '1111'" }, { "name" : "OrderDate", "type" : "string", "doc" : "Type inferred from '\"11/11/2012\"'" } ] } }, "doc" : "Type inferred from '[{\"ItemId\":1111,\"OrderDate\":\"11/11/2012\"}, - Amit Kadam
{\"ItemId\":2222,\"OrderDate\":\"12/12/2012\"}]'" } ] } - Amit Kadam
above is avro schema for one avro flow file and another is.. { "type" : "record", "name" : "jsonRecord", "fields" : [ { "name" : "Id", "type" : "int", "doc" : "Type inferred from '1237'" }, { "name" : "Username", "type" : "string", "doc" : "Type inferred from '\"fred12356\"'" }, { "name" : "Name", "type" : "string", "doc" : "Type inferred from '\"Fred\"'" }, - Amit Kadam

1 Answers

2
votes

Rather than using SplitJson and FlattenJson, you could use JoltTransformJSON with the following ChainR spec to flatten the whole thing without splitting:

[
  {
    "operation": "shift",
    "spec": {
      "*": {
        "ShippingAddress": {
          "Address1": "[&2].ShippingAddress_Address1",
          "Address2": "[&2].ShippingAddress_Address2",
          "City": "[&2].ShippingAddress_City",
          "State": "[&2].ShippingAddress_State"
        },
        "Orders": {
          "*": {
            "ItemId": "[&3].Orders_&1_ItemId",
            "OrderDate": "[&3].Orders_&1_OrderDate"
          }
        },
        "*": "[&1].&"
      }
    }
  }
]

Not sure what the ConvertRecord is used for, but you shouldn't need the MergeRecord anymore. If this isn't the output you're looking for, please let me know what you're expecting (for both records, the one with and without an Orders field) and I'll be happy to help.