0
votes

Context

I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.

I deployed them into a local Kafka environment using the fast-data-dev Docker image provided by Landoop

The basic setup works and produces a message each second that is logged

However, I want to change the subject name strategy. The default one generates two subjects:

  • ${topic}-key
  • ${topic}-value

As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:

  • ${topic}-${keyRecordName}
  • ${topic}-${valueRecordName}

As per the docs, my needs fits into the TopicRecordNameStrategy

What have I tried

I create the avroData object for sending values to connect:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

and use it afterwards for creating the SourceRecord response objects

The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

Problem

The connector seems to ignore those properties and keeps using the old ${topic}-key and ${topic}-value subjects.

Question

Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key) and it works

What is the proper setup for specyfing the subject strategy to Kafka Connect?

1

1 Answers

9
votes

You're missing the key.converter and value.converter prefix, for the config to be passed through to the conveter. So instead of:

key.subject.name.strategy
value.subject.name.strategy

you want:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

Source https://docs.confluent.io/current/connect/managing/configuring.html:

To pass configuration parameters to key and value converters, prefix them with key.converter. or value.converter. as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in the key.converter or value.converter properties.