Context
I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.
I deployed them into a local Kafka environment using the fast-data-dev Docker image provided by Landoop
The basic setup works and produces a message each second that is logged
However, I want to change the subject name strategy. The default one generates two subjects:
${topic}-key
${topic}-value
As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
As per the docs, my needs fits into the TopicRecordNameStrategy
What have I tried
I create the avroData
object for sending values to connect:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
and use it afterwards for creating the SourceRecord
response objects
The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Problem
The connector seems to ignore those properties and keeps using the old ${topic}-key
and ${topic}-value
subjects.
Question
Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter
and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key
) and it works
What is the proper setup for specyfing the subject strategy to Kafka Connect?