0
votes

We use Kafka Connect to ingest data from Oracle datasource and write to HDFS in AVRO format. In Kafka Schema Registry the schema for one of the datasource looks like this :

{
  "type": "record",
  "name": "ConnectDefault",
  "namespace": "io.confluent.connect.avro",
  "fields": [
    {
      "name": "ID",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 0,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "0"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }....
}

This means the ID column has precision 64. When I try to these AVRO files, its throwing :

Caused by: org.apache.spark.sql.AnalysisException: decimal can only support precision up to 38; at org.apache.spark.sql.types.DecimalType.(DecimalType.scala:51) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:60) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46) at org.apache.spark.sql.avro.AvroFileFormat.inferSchema(AvroFileFormat.scala:93) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)

My code snippet to read the AVO file is:

def readSchemaOfAvroPartition(avroLocation: String, partitionColumn: String, partitionValue: String): StructType = {
      sparkSession.read.format(AVRO)
        .load(s"${avroLocation}/${partitionColumn}=${partitionValue}")
        .schema
    }

According to Oracle doc maximum precision should be 38. https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313

How to force Kafka Connect to register this schema value as 38 instead of 64?

1

1 Answers

2
votes

This is not necessarily a bug in Kafka connector but in general how Kafka works. In many databases the NUMERIC or DECIMAL column is defined with a precision and scale value. The database defaults the precision and scale based on the database technology.

The Kafka connect doesn't have a good way of handling this across databases and other non database file systems.

Further details can be found here-

https://github.com/confluentinc/kafka-connect-jdbc/issues/563

https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector#bytes-decimals-numerics

https://gist.github.com/rmoff/7bb46a0b6d27982a5fb7a103bb7c95b9#file-oracle-md