6
votes

I am trying to use Kafka Connect JDBC sink connector to insert data into Oracle but it is throwing an error . I have tried with all the possible configurations of the schema. Below is the examples .

Please suggest if I am missing anything below are my configurations files and errors.

Case 1- First Configuration

internal.value.converter.schemas.enable=false .

so I am getting the

[2017-08-28 16:16:26,119] INFO Sink task WorkerSinkTask{id=oracle_sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)

[2017-08-28 16:16:26,606] INFO Discovered coordinator dfw-appblx097-01.prod.walmart.com:9092 (id: 2147483647 rack: null) for group connect-oracle_sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)

[2017-08-28 16:16:26,608] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)

[2017-08-28 16:16:26,609] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:16:27,174] INFO Successfully joined group connect-oracle_sink with generation 26 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)

[2017-08-28 16:16:27,176] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)

[2017-08-28 16:16:28,580] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)

org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DJ

   at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:190)

   at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)

   at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)

   at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)

   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)

   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)

   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)

   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

   at java.util.concurrent.FutureTask.run(FutureTask.java:266)

   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

   at java.lang.Thread.run(Thread.java:748)

2nd Configuration -

internal.key.converter.schemas.enable=true

internal.value.converter.schemas.enable=true

Log:

[2017-08-28 16:23:50,993] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)

[2017-08-28 16:23:50,993] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:23:51,260] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:23:51,381] INFO Successfully joined group connect-oracle_sink with generation 29 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)

[2017-08-28 16:23:51,384] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)

[2017-08-28 16:23:51,727] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

   at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)

Oracle connector.properties looks like

name=oracle_sink

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

tasks.max=1

topics=DJ

connection.url=jdbc:oracle:thin:@hostname:port:sid

connection.user=username

connection.password=password

#key.converter=org.apache.kafka.connect.json.JsonConverter

#value.converter=org.apache.kafka.connect.json.JsonConverter

auto.create=true

auto.evolve=true

Connect-Standalone.properties

My JSON looks like -

{"Item":"12","Sourcing Reason":"corr","Postal Code":"l45","OrderNum":"10023","Intended Node Distance":1125.8,"Chosen Node":"34556","Quantity":1,"Order Date":1503808765201,"Intended Node":"001","Chosen Node Distance":315.8,"Sourcing Logic":"reducesplits"}
2

2 Answers

14
votes

Per the documentation

The sink connector requires knowledge of schemas, so you should use a suitable converter e.g. the Avro converter that comes with the schema registry, or the JSON converter with schemas enabled.

So if your data is JSON you would have the following configuration:

[...]
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
[...]

The error you see in the second instance is pertinent -- JsonConverter with schemas.enable requires "schema" and "payload" fields - the JSON you share does not meet this required format.

Here's a simple example of a valid JSON message with schema and payload :

{
    "schema": {
        "type": "struct",
        "fields": [{
            "type": "int32",
            "optional": true,
            "field": "c1"
        }, {
            "type": "string",
            "optional": true,
            "field": "c2"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "create_ts"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "update_ts"
        }],
        "optional": false,
        "name": "foobar"
    },
    "payload": {
        "c1": 10000,
        "c2": "bar",
        "create_ts": 1501834166000,
        "update_ts": 1501834166000
    }
}

What's your source for the data that you're trying to land to Oracle? If it's Kafka Connect inbound then you simply use the same converter configuration (Avro + Confluent Schema Registry) would be easier and more efficient. If it's a custom application, you'll need to get it to either (a) use the Confluent Avro serialiser or (b) write the JSON in the required format above, providing the schema of the payload inline with the message.

1
votes

I've the same problem, after the read this post. I has been resolved with JDBC Sink MySQL Below my Kafka Connect Configuration, as additional information:

curl --location --request POST 'http://localhost:8083/connectors/' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "ttib-transactions",
        "connection.url": "jdbc:mysql://172.17.0.1:6603/tt-tran?verifyServerCertificate=true&useSSL=false",
        "connection.user": "root",
        "connection.password": "*******",
        "value.converter.schema.registry.url": "https://psrc-j55zm.us-central1.gcp.confluent.cloud",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "insert.mode": "insert",
        "batch.size":"0",
        "table.name.format": "${topic}",
        "pk.fields" :"id"
    }
}'