Error produced by JDBC sink connector:
org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.util.Date for field: "some_timestamp_field"
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:151)
at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:107)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
The avro schema registered by source JDBC connector (MySQL):
{
"type":"record",
"name":"ConnectDefault",
"namespace":"io.confluent.connect.avro",
"fields":[
...
{
"name":"some_timestamp_field",
"type":{
"type":"long",
"connect.version":1,
"connect.name":"org.apache.kafka.connect.data.Timestamp",
"logicalType":"timestamp-millis"
}
},
...
]
}
Looks like the exception is due to this code block: https://github.com/apache/kafka/blob/f0282498e7a312a977acb127557520def338d45c/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L239
So, in the avro schema, the timestamp field is registered as INT64 with correct (timestamp) logical type. But connect reads the schema type as INT64
and compares it with value type java.util.Date
.
Is this a bug, or there is a work around for this? May be I am missing something as this looks like a standard connect model.
Thanks in advance.
UPDATE
Sink connector config:
{
"name": "sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "topic",
"connection.url": "jdbc:postgresql://host:port/db",
"connection.user": "user",
"connection.password": "password",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://host:port",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://host:port",
"auto.create": "true",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "id"
}
}
Deserialised data in Kafka:
{
"id":678148,
"some_timestamp_field":1543806057000,
...
}
kafka 1.1.0
. And for schema-registry, we are using the executable shipped withconfluent-oss-4.0.0
. As of kafka connect jdbc, versionkafka-connect-jdbc-5.2.0-SNAPSHOT
is used. – darkprince92