0
votes

Are enum fields supported in Kafka Connect? If not, what is the usual workaround? I'm looking at the Kafka 2.6.0 ConnectSchema API here: https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/data/ConnectSchema.html

I'm trying to follow best practice by using the Confluent Schema registry (with AVRO), but can't seem to get my custom source connector to generate a schema containing enums to match an existing schema (the output topic has other producers besides the connector). A work-around would be to simply use strings, but that undermines the whole point of a schema, doesn't it?

1
Enums in Avro are converted to strings in certain sink connectors, but there is a property named something like enhanced avro support in the AvroConverter which handles enums differentlyOneCricketeer
Looks like "enhanced avro support" is specific custom code for the S3 Connector? Only references I could find are github.com/confluentinc/schema-registry/issues/1306 and docs.confluent.io/current/connect/kafka-connect-s3/…Ryan
It should be in the Avro converter or deserializerOneCricketeer
Looks like much of the logic is here: github.com/confluentinc/schema-registry/blob/… . Support appears shaky as schema equivalence based on doc and defaults might be problematic too: github.com/confluentinc/schema-registry/issues/1042Ryan

1 Answers

0
votes

There is not an approach that works in general, but you can use a converter specific special configuration in the case of AVRO, and then you must also provide special hints via the enum field Schema properties. I was able to provide the hints using a custom Connect Transform.

Configure the Connect converter with:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://registry:8081",
"value.converter.enhanced.avro.schema.support":  true,
"value.converter.connect.meta.data": false,
"transforms": "alarms",
"transforms.alarms.type": "org.jlab.kafka.connect.transforms.EpicsToAlarm$Value"

Then the custom Transform contains:

    final Schema prioritySchema = SchemaBuilder
            .string()
            .doc("Alarm severity organized as a way for operators to prioritize which alarms to take action on first")
            .parameter("io.confluent.connect.avro.enum.doc.AlarmPriority", "Enumeration of possible alarm priorities")
            .parameter("io.confluent.connect.avro.Enum", "org.jlab.kafka.alarms.AlarmPriority")
            .parameter("io.confluent.connect.avro.Enum.1", "P1_LIFE")
            .parameter("io.confluent.connect.avro.Enum.2", "P2_PROPERTY")
            .parameter("io.confluent.connect.avro.Enum.3", "P3_PRODUCTIVITY")
            .parameter("io.confluent.connect.avro.Enum.4", "P4_DIAGNOSTIC")
            .build();

Full Transform Source