1
votes

I am trying to use NiFi's PutCassandraRecord processor to insert some JSON records into a Cassandra DB. I'm trying to insert a timestamp type into Cassandra, but NiFi is complaining about a NumberFormatException for input string "2019-02-02T08:00:00.000"

The cassandra data type for said timestamp field is (ts timestamp) I'm using an Avro schema with: { "name": "ts", "type": {"type": "long", "logicalType": "timestamp-millis"}}

{
  "name": "app.records",
  "type": "record",
  "fields": [
    { "name": "ts", "type": {"type": "long", "logicalType": "timestamp-millis"}},
    { "name": "app_name", "type": "string" },

NiFi logs show that it's able to parse the JSON object but couldn't convert it to a record...

2019-05-13 21:13:04,036 ERROR [Timer-Driven Process Thread-2] o.a.n.p.cassandra.PutCassandraRecord PutCassandraRecord[id=ecb33d77-cc4a-17f5-23a8-e002e1777a1c] Unable to write the records into Cassandra table due to org.apache.nifi.serialization.MalformedRecordException: Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema: org.apache.nifi.serialization.MalformedRecordException: Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema
org.apache.nifi.serialization.MalformedRecordException: Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema
        at org.apache.nifi.json.AbstractJsonRowRecordReader.nextRecord(AbstractJsonRowRecordReader.java:98)
        at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
        at org.apache.nifi.processors.cassandra.PutCassandraRecord.onTrigger(PutCassandraRecord.java:151)
        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162)
        at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209)
        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
        at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "2019-02-02T08:00:35.473"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:589)
        at java.lang.Long.parseLong(Long.java:631)
        at org.apache.nifi.serialization.record.util.DataTypeUtils.toTimestamp(DataTypeUtils.java:1057)
        at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:156)
        at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:120)
        at org.apache.nifi.json.JsonTreeRowRecordReader.convertField(JsonTreeRowRecordReader.java:170)
        at org.apache.nifi.json.JsonTreeRowRecordReader.convertJsonNodeToRecord(JsonTreeRowRecordReader.java:137)
        at org.apache.nifi.json.JsonTreeRowRecordReader.convertJsonNodeToRecord(JsonTreeRowRecordReader.java:83)
        at org.apache.nifi.json.JsonTreeRowRecordReader.convertJsonNodeToRecord(JsonTreeRowRecordReader.java:74)
        at org.apache.nifi.json.AbstractJsonRowRecordReader.nextRecord(AbstractJsonRowRecordReader.java:93)
        ... 14 common frames omitted

The types seem to be all correct. Any help would be appreciated.

2
Which version of NiFi are you using? - Sivaprasanna Sethuraman
I am using 1.9.2 - apspam

2 Answers

0
votes

The problem is that you're trying to insert timestamp field without specification of the format for the date. The corresponding code looks as following:

If input data is string, then try to get format string for it, and then if the format string is valid formatter, then get date using it. If either format string is not specified, or it's not valid, then NiFi is trying to convert it using Long.parseLong.

You need to either perform explicit casting of the corresponding field with something like this:

toDate("yyyy-MM-dd'T'hh:mm:ss")
0
votes

I ended up converting the datetime to an epoch timestamp, and converted that to milliseconds and casted it to a long in order for it to work with my Avro schema.

ts = datetime.datetime.strptime(strippedTime, '%Y-%m-%d %H:%M:%S.%f')
epoch = datetime.datetime(1970,1,1)
timestamp = long((ts-epoch).total_seconds()*1000)
fields['ts'] = timestamp