We use Kafka Connect to ingest data from Oracle datasource and write to HDFS in AVRO format. In Kafka Schema Registry the schema for one of the datasource looks like this :
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{
"name": "ID",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}....
}
This means the ID column has precision 64. When I try to these AVRO files, its throwing :
Caused by: org.apache.spark.sql.AnalysisException: decimal can only support precision up to 38; at org.apache.spark.sql.types.DecimalType.(DecimalType.scala:51) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:60) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46) at org.apache.spark.sql.avro.AvroFileFormat.inferSchema(AvroFileFormat.scala:93) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
My code snippet to read the AVO file is:
def readSchemaOfAvroPartition(avroLocation: String, partitionColumn: String, partitionValue: String): StructType = {
sparkSession.read.format(AVRO)
.load(s"${avroLocation}/${partitionColumn}=${partitionValue}")
.schema
}
According to Oracle doc maximum precision should be 38. https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313
How to force Kafka Connect to register this schema value as 38 instead of 64?