1
votes

I have incoming Avro records that roughly follow the format below. I am able to read them and convert them in existing NiFi flows. However, a recent change requires me to read from these files and parse the nested record, employers in this example. I read the Apache NiFi blog post, Record-Oriented Data with NiFi but was unable to figure out how to get the AvroRecordReader to parse nested records.

{
  "name": "recordFormatName",
  "namespace": "nifi.examples",
  "type": "record",
  "fields": [
    { "name": "id", "type": "int" },
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" },
    { "name": "email", "type": "string" },
    { "name": "gender", "type": "string" },
    { "name": "employers",
        "type": "record",
        "fields": [
            {"name": "company", "type": "string"},
            {"name": "guid", "type": "string"},
            {"name": "streetaddress", "type": "string"},
            {"name": "city", "type": "string"}
        ]}
  ]
}

What I hope to achieve is a flow to read the employers records for each recordFormatName record and use the PutDatabaseRecord processor to keep track of the employers values seen. The current plan is to insert the records to a MySQL database. As suggested in an answer below, I plan on using PartitionRecord to sort the records based on a value in the employers subrecord. I do not need the top level details for this particular flow.

I have tried to parse with the AvroRecordReader but cannot figure out how to specify the nested records. Is this something that can be accomplished with the AvroRecordReader alone or does preprocessing, say a JOLT Transform need to happen first?

EDIT: Added further details about database after receiving a response.

1

1 Answers

2
votes

What is your target DB and what does your target table look like? PutDatabaseRecord may not be able to handle nested records unless your DB, driver, and target table support them.

Alternatively you may need to use UpdateRecord to flatten the "employers" object into fields at the top level of the record. This is a manual process (until NIFI-4398 is implemented), but you only have 4 fields. After flattening the records, you could use PartitionRecord to get all records with a specific value for, say, employers.company. The outgoing flow files from PartitionRecord would technically constitute the distinct values for the partition field(s). I'm not sure what you're doing with the distinct values, but if you can elaborate I'd be happy to help.