5
votes

I have an already running production deployed Kafka-Cluster and having Topic "existing-topic". I am using MongoDB-Source-Connector from Debezium.

Here all what I want is to push the CDC events directly to the topic "existing-topic" so that my consumers which are already listening to that topic will process it.

I didn't find any resource to do it so, however it's mentioned that topic is created in below format -

"If your mongodb.name parameter is A, database name is B and collection name is C, the data from database A and collection C will be loaded under the topic A.B.C"

Can I change the topic to "existing-topic" and push the events to it?

2

2 Answers

2
votes

According to the documentation,

The name of the Kafka topics always takes the form logicalName.databaseName.collectionName, where logicalName is the logical name of the connector as specified with the mongodb.name configuration property, databaseName is the name of the database where the operation occurred, and collectionName is the name of the MongoDB collection in which the affected document existed.


This means that if your connector's logical name is myConnector and your database myDatabase has two collections users and orders

{
  "name": "myConnector",  
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "mongodb.hosts": "mongo-db-host:27017", 
    "mongodb.name": "myDatabase", 
    "collection.whitelist": "myDatabase[.]*", 
  }
}

then Kafka Connect will populate two topics with names:

  • myConnector.myDatabase.users
  • myConnector.myDatabase.orders

Now if you still want to change the name of the target topic, you can make use of Kafka Connect Single Message Transforms (SMT). More precisely, ExtractTopic should help you. Note though that this SMT helps you extract the topic name from the key or value of the message, therefore you somehow need to include the desired topic name in the payload.

For example, the following SMT will extract the value of field myField and use this as the record's topic:

 transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
 transforms.ValueFieldExample.field=myField
2
votes

I was facing the same problem with the JDBC Source Connector and found a different solution:

Using the RegexRouter Single Message Transform with dropPrefix you can just override the whole topic name:

"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"A.B.C",                 // whole created topic name
"transforms.dropPrefix.replacement":"existing-topic"   // whole exisiting topic name

And it works with regex, so if you're using multiple tables/collections and your created topic name isn't constant, you should be able to make it dynamic.

It's a bit hacky as technically I'm dropping the whole topic name and then adding a new topic name - which isn't the best solution, to me anyway.