I need to migrate a SQL Server database from on prem location to GoogleCloud Using confluent/kafka to do it
I do have a source debezium connector
{
"name": "mssql_src",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
...
...
...
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "source_dbname.dbo(.*)",
"transforms.Reroute.topic.replacement": "target_dbname$1"
}
}
reroute transformation does not work with conjunction of unwrap, I still getting source_dbname.dbo.* topics instead of target_dbname.*
I need insert data into target_dbname database jdbc sync connector has following configuration
{
"name": "mssql_trg",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "source_dbname.dbo.*",
"table.name.format": "${topic}",
"connection.url": "jdbc:sqlserver://xxx.xxx.xxx.xxx:1433;DatabaseName=rocketlawyer3",
"connection.user": "sqlserver",
"connection.password": "sqlserver",
"dialect.name": "SqlServerDatabaseDialect",
"insert.mode": "upsert",
"auto.create": true,
"auto.evolve": true,
"pk.mode": "record_value"
}
}
Obviously it is failed because all SQL operations are referring tables as source_database_name.dbo.table_name
Here is 2 questions:
How can I change this string source_database_name.dbo.table_name just for table_name using table.name.format or other option
How can I make both transformation (reroute and unwrap) work in source connector