0
votes

I am trying to insert rows into my Oracle table using Kafka jdbc sink connect. I have messages in my Kafka topic (JSON) like below;

[{"f1":"qws","f2":"zcz","f3":"SDFF","f4":"f33bfed577bcd7c4625479bd3cd13323--1132061303","f5":null,"f6":null,"f7":"ghSDAgh/akdjytfd/jhsgd","f8":"hsfgd/sdfjghsfjd/jsg","f9":null,"f10":"ASD","f11":"sdfg/vbnm","f12":"S","startTime":"2018-01-30T05:24:41.162","_startTime":"DATE","f13":219,"f14":"http://192.168.0.1:1234/asd/fgh/jkl/zxc/vbn/qwe/rty","f15":"fe80:0:0:0:7501:14d9:b44b:2a95%eth5","f16":1234,"f17":"ABCD-1234","f18":"192.168.0.1","f19":"sdfgd","dfgVO":{"fa1":null,"fa2":"formats","fa3":""qwe.rty.uiop.asd.fgh.jkl.zxc.vbn.asdf@61e97f29"","fa4":7,"fa5":79,"fa6":null,"fa7":"{}","fa8":1517289881381},"f20":null,"f21":"http-drte-1234-uik-7","f22":false,"f23":false,"f24":false}]

I have the connector configuration like below;

name=jdbc-sink-2
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=my_topic_1
connection.url=jdbc:oracle:thin:@192.168.0.1:1521:user01
connection.user=USER1
connection.password=PASSWD1
auto.create=true
table.name.format=MY_TABLE_2
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
producer.retries=1

When I start the connector, I am getting the error below;

[2018-01-30 11:16:55,417] ERROR Task jdbc-sink-2 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:406)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-01-30 11:16:55,422] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)

Then I added the below configurations to my existing connector configuration;

key.converter.schemas.enable=false
value.converter.schemas.enable=false

Now, I am getting another error like below;

[2018-01-30 11:36:58,118] ERROR Task jdbc-sink-2 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: MY_TABLE_2
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:190)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-01-30 11:36:58,123] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
[2018-01-30 11:36:58,124] ERROR Task jdbc-sink-2 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-01-30 11:36:58,125] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)

This says that I need to modify my Kafka message like key value schema format. I cannot modify my Kafka message format since it is published by someone else. How can I fix this error?

Thank you.

1
The Kafka message should probably be a JSON object, not an arrayOneCricketeer

1 Answers

1
votes

Per doc, if you want to use the JDBC Sink, you need to provide a schema. You can do this either using Avro + Schema Registry, or using JSON with embedded schema. You can see a sample of the expected JSON structure here.

Where is your data coming from? If it's Kafka Connect source, you can just use Avro or JSON with schemas enabled. If it's elsewhere, you'll need to amend that to provide the data to include schema - the Avro serialiser provided with the Schema Registry can do just this for you.