0
votes

While my Kafka JDBC Connector works for a simple table, for most other tables it fails with the error:

Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Invalid decimal scale: 127 (greater than precision: 64) at org.apache.avro.LogicalTypes$Decimal.validate(LogicalTypes.java:231) at org.apache.avro.LogicalType.addToSchema(LogicalType.java:68) at org.apache.avro.LogicalTypes$Decimal.addToSchema(LogicalTypes.java:201) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:943) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1058) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:899) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:731) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:725) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:364) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 11 more

I am creating the connector using the below command:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{"name": "jdbc_source_oracle_03","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:oracle:thin:@//XOXO:1521/XOXO","connection.user":"XOXO","connection.password":"XOXO","numeric.mapping":"best_fit","mode":"timestamp","poll.interval.ms":"1000","validate.non.null":"false","table.whitelist":"POLICY","timestamp.column.name":"CREATED_DATE","topic.prefix":"ora-","transforms": "addTopicSuffix,InsertTopic,InsertSourceDetails,copyFieldToKey,extractValuefromStruct","transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value","transforms.InsertTopic.topic.field":"messagetopic","transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value","transforms.InsertSourceDetails.static.field":"messagesource","transforms.InsertSourceDetails.static.value":"JDBC Source Connector from Oracle on asgard","transforms.addTopicSuffix.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.addTopicSuffix.regex":"(.*)","transforms.addTopicSuffix.replacement":"$1-jdbc-02","transforms.copyFieldToKey.type":"org.apache.kafka.connect.transforms.ValueToKey","transforms.copyFieldToKey.fields":"ID","transforms.extractValuefromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractValuefromStruct.field":"ID"}}'

1
This seems to be the relevant part Invalid decimal scale: 127 (greater than precision: 64). Check the handling of the FLOAT columns.Marmite Bomber

1 Answers

0
votes

The problem was related to Number columns without declared precision and scale. Well explained by Robin Moffatt here: https://rmoff.net/2018/05/21/kafka-connect-and-oracle-data-types