1
votes

I'm now facing an issue converting Kafka's message record of type long for nano-seconds (19 digits) to a string timestamp with milliseconds. The messages are coming in Avro format and contain different schemas (so we can`t statically define one schema) stored in Confluent Schema Registry. The current process is:

1) ConsumeKafkaRecord_2_0 which reads the message and stores the Avro schema coming from Confluent Schema Registry into avro.schema attribute

2) UpdateAttribute which is looking for a pattern of a timestamp record in the avro.schema and adds "logicalType":"timestamp-micros" (because i can`t find timestamp-nanos type in the Avro specification)

3) ConvertRecord which converts the Avro flowfile using avro.schema into JSON. It uses the logicalType assigned in the previous step and converts the 19 digits long into the yyyy-MM-dd HH:mm:SS.SSSSSS. Here the issue is that 19 digits is a nano-timestamp type which is missing in Avro specification, so we only can use timestamp-micros type and receive the year 51000+ values.

4) ReplaceText - this processor gives us a workaround for an issue described above and we are replacing the values of the 5-digits-year pattern with a "correct" datetime (with milliseconds, because Java somehow can`t work with microseconds) using and expression: ${'$1':toDate('yyyyy-MM-dd HH:mm:ss.SSSSSS'):toNumber():toString():substring(0, 13):toNumber():toDate():format('yyyy-MM-dd HH:mm:ss.SSS')}

After that we go on with other processors, the workaround works but with a strange issue - our resulting timestamps differ for a few milliseconds from what we receive in Kafka. I can only guess this is the result of the transformations described above. That`s why my question is - is there a better way to handle 19-digit values coming in the Avro messages (the schemas are in Confluent Schema Registry, the pattern for timestamp fields in schema is known) so that they are cast into correct millisecond timestamps? Maybe some kind of field value replacement (substring of 13 digits from 19-digit value) in Avro flowfile content based on its schema which is embedded/stored in avro.schema attribute?

Please let me know if something is unclear and if some additional details are needed. Thanks a lot in advance!

1
the problem appears when you try to convert string(with nanosec date) to date with following the format: whatever.SSSSSS. juva could return unexpected millis for this. in your expression you are just cutting off last three digits. So, just use a substring or replace...daggett
@daggett the issue is that i can change the Avro schema logicalType for a known pattern of timestamp field, but i dont know how i can substring the value in the avro content based on this pattern in the schema. Thats why i have to convert the type first to timestamp-millis or timestamp-micros (only those logical types are in Avro, no timestamp-nanos), and only then i can use the pattern with 5-char year number to "recalculate" the datetimes by converting them back to long ints, substring 13 digits, convert back to datetime.Alexey Chibisov

1 Answers

0
votes

The following solution worked for our case, a Groovy script which converts one avro file into another (both schema and content):

@Grab('org.apache.avro:avro:1.8.2')
import org.apache.avro.*
import org.apache.avro.file.*
import org.apache.avro.generic.*

//function which is traversing through all records (including nested ones)
def convertAvroNanosecToMillisec(record){
    record.getSchema().getFields().forEach{ Schema.Field field -> 
        if (record.get(field.name()) instanceof org.apache.avro.generic.GenericData.Record){
            convertAvroNanosecToMillisec(record.get(field.name()))
        }

        if (field.schema().getType().getName() == "union"){
            field.schema().getTypes().forEach{ Schema unionTypeSchema ->
                if(unionTypeSchema.getProp("connect.name") == "io.debezium.time.NanoTimestamp"){
                    record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13)))
                    unionTypeSchema.addProp("logicalType", "timestamp-millis")
                }
            }
        } else {
            if(field.schema().getProp("connect.name") == "io.debezium.time.NanoTimestamp"){
                record.put(field.name(), Long.valueOf(record.get(field.name()).toString().substring(0, 13)))
                field.schema().addProp("logicalType", "timestamp-millis")
            }
        }

    } 
    return record
}

//start flowfile processing
def flowFile = session.get()
if(!flowFile) return

try {

flowFile = session.write(flowFile, {inStream, outStream ->
  // Defining avro reader and writer
  DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
  DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>())

  def contentSchema = reader.schema //source Avro schema
  def records = [] //list will be used to temporary store the processed records

  //reading all records from incoming file and adding to the temporary list
  reader.forEach{ GenericRecord contentRecord -> 
      records.add(convertAvroNanosecToMillisec(contentRecord))
  }

  //creating a file writer object with adjusted schema
  writer.create(contentSchema, outStream)

  //adding records to the output file from the temporary list and closing the writer
  records.forEach{ GenericRecord contentRecord -> 
      writer.append(contentRecord)
  }

  writer.close()

} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
   log.error('Error appending new record to avro file', e)
   flowFile = session.penalize(flowFile)
   session.transfer(flowFile, REL_FAILURE)
}