1
votes

I need to migrate a SQL Server database from on prem location to GoogleCloud Using confluent/kafka to do it

I do have a source debezium connector

{
  "name": "mssql_src",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    ...
    ...
    ...
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "op,table,source.ts_ms",
    "transforms.unwrap.delete.handling.mode": "rewrite",

    "transforms": "Reroute",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "source_dbname.dbo(.*)",
    "transforms.Reroute.topic.replacement": "target_dbname$1"
  }
}

reroute transformation does not work with conjunction of unwrap, I still getting source_dbname.dbo.* topics instead of target_dbname.*

I need insert data into target_dbname database jdbc sync connector has following configuration

{
  "name": "mssql_trg",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics.regex": "source_dbname.dbo.*",
    "table.name.format": "${topic}",
    "connection.url": "jdbc:sqlserver://xxx.xxx.xxx.xxx:1433;DatabaseName=rocketlawyer3",
    "connection.user": "sqlserver",
    "connection.password": "sqlserver",
    "dialect.name": "SqlServerDatabaseDialect",
    "insert.mode": "upsert",
    "auto.create": true,
    "auto.evolve": true,
    "pk.mode": "record_value"
  }
}

Obviously it is failed because all SQL operations are referring tables as source_database_name.dbo.table_name

Here is 2 questions:

  1. How can I change this string source_database_name.dbo.table_name just for table_name using table.name.format or other option

  2. How can I make both transformation (reroute and unwrap) work in source connector

1

1 Answers

0
votes

You need to use chained transformations. Just combine unwrap and Rerout SMTs together:

{
  "name": "mssql_src",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    ...
    ...
    ...
    "transforms": "unwrap, Reroute",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "op,table,source.ts_ms",
    "transforms.unwrap.delete.handling.mode": "rewrite",

    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.Reroute.topic.replacement": "$3"
  }
}