3
votes

Error produced by JDBC sink connector:

org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.util.Date for field: "some_timestamp_field"
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:151)
at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:107)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

The avro schema registered by source JDBC connector (MySQL):

{  
   "type":"record",
   "name":"ConnectDefault",
   "namespace":"io.confluent.connect.avro",
   "fields":[  
      ...
      {  
         "name":"some_timestamp_field",
         "type":{  
            "type":"long",
            "connect.version":1,
            "connect.name":"org.apache.kafka.connect.data.Timestamp",
            "logicalType":"timestamp-millis"
         }
      },
      ...
   ]
}

Looks like the exception is due to this code block: https://github.com/apache/kafka/blob/f0282498e7a312a977acb127557520def338d45c/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L239

So, in the avro schema, the timestamp field is registered as INT64 with correct (timestamp) logical type. But connect reads the schema type as INT64 and compares it with value type java.util.Date.

Is this a bug, or there is a work around for this? May be I am missing something as this looks like a standard connect model.

Thanks in advance.

UPDATE

Sink connector config:

{
    "name": "sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "topic",
        "connection.url": "jdbc:postgresql://host:port/db",
        "connection.user": "user",
        "connection.password": "password",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://host:port",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://host:port",

        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields": "id"
    }
}

Deserialised data in Kafka:

{
   "id":678148,
   "some_timestamp_field":1543806057000,
   ...
}
1
Can you show example topic data? And your connector config?OneCricketeer
Added the topic data and connector config.darkprince92
And which Kafka connect or Confluent version are you running? The link you pasted from, is for an unrelased version of Confluent Platform (although, I don't think that section of code changed recently)OneCricketeer
We are using kafka connect shipped with kafka 1.1.0. And for schema-registry, we are using the executable shipped with confluent-oss-4.0.0. As of kafka connect jdbc, version kafka-connect-jdbc-5.2.0-SNAPSHOT is used.darkprince92
@cricket_007, here is the update, when we drop the cast transform, the error is gone. Most probably we are facing this issue: issues.apache.org/jira/browse/KAFKA-5891. Any thoughts on this?darkprince92

1 Answers

1
votes

We have worked out a work around for the problem. Our goal was to convert the id from BIGINT to STRING(TEXT/VARCHAR) and save the record in downstream db.

But due to an issue (probably https://issues.apache.org/jira/browse/KAFKA-5891), casting the id field was not working. Kafka was trying to validate the timestamp fields also in the casting chain, but was reading the schema type/name wrong and resulting a type mismatch (see the above record body and error log).

So we made a work around as follows:

extract only the id field as key -> execute cast transform on the key -> it works as key does not contain timestamp field.

Here is the worked around configuration:

{
    "name": "sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "topic",
        "connection.url": "jdbc:postgresql://host:port/db",
        "connection.user": "user",
        "connection.password": "password",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://host:port",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://host:port",

        "transforms": "createKey,castKeyToString",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "id",

        "transforms.castKeyToString.type": "org.apache.kafka.connect.transforms.Cast$Key",
        "transforms.castKeyToString.spec": "id:string",

        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "id"
    }
}

Disclaimer: This is not a proper solution, just a work around. The bug in casting transform should be fixed. In my opinion, the casting transform should only have concerns with the fields designated for casting, not other fields in the message.

Have a good day.