I'm using Kafka Connect in Confluent Community Platform to keep MySQL databases synchronized. Sources and sinks are MySQL databases. It didn't work.
There are some issues in my situations:
There are tables in other databases in the same server, and i don't want to read them into Kafka, but Kafka Connect Source keep trying to read other databases.
I want to use
org.apache.kafka.connect.json.JsonConverter
in both Source Connector and Sink Connector, but sink connectors couldn't insert correctly.I want to synchronize several databases, tables in different databases may be with same table names, how to avoid table names conflict and sink connectors route the Kafka topics correctly to insert data into the right databases? MySQL Synchronization illustration
The Kafka JDBC Source Connector config file is:
{
"name": "br-auths-3910472223-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/br_auths?user=root&password=123456",
"database.whitelist":"br_auths",
"table.blacklist": "br_auths.__migrationversions,br_auths.auths_service_apps",
"mode": "timestamp",
"timestamp.column.name": "utime",
"validate.non.null": "false",
"incrementing.column.name": "id",
"topic.prefix": "br_auths__"
}
}
The Kafka JDBC Sink Connector config file is:
{
"name": "br-auths-3910472223-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",
"tasks.max": "1",
"connection.url": "jdbc:mysql://rm-hp303a0n2vr8970.mysql.huhehaote.rds.aliyuncs.com:3306/dev-br-auths-391047222?user=br_auths&password=@123456",
"topics": "br_auths__auths_roles,br_auths__auths_user_logins,br_auths__auths_user_roles,br_auths__auths_users,br_auths__auths_user_claims,br_auths__auths_user_tokens,br_auths__auths_role_claims",
"auto.create": "true",
"insert.mode": "upsert",
"transforms":"dropTopicPrefix",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"br_auths__(.*)",
"transforms.dropTopicPrefix.replacement":"$1"
}
}
I want to create several pairs of source and sink connectors for different databases, some white list tables in the database A in MySQL server-A can be synchronized with database A in MySQL server-B incrementally.
Update 1:
I changed to connect-avro-distributed, Debezium Source Connector and JDBC Sink Connector. The source connector is:
{
"name":"br-auths-3910472223-source",
"config":{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "br123456",
"database.useLegacyDatetimeCode": "false",
"database.server.id": "184",
"database.server.name": "local3910472223",
"database.whitelist":"br_auths",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.br-auths.local3910472223" ,
"table.blacklist": "br_auths.__migrationversions,br_auths.auths_service_apps",
"include.schema.changes": "true",
"transforms": "route,TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.target.type": "string",
"transforms.TimestampConverter.field": "payload.after.ctime",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$2__$3"
}
}
And the sink connector is:
{
"name": "br-auths-3910472223-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://rm-hp303a0n2.mysql.huhehaote.rds.aliyuncs.com:3306/dev-br-auths-391047222?useLegacyDatetimeCode=false&user=br_auths&password=123456",
"dialect.name": "MySqlDatabaseDialect",
"topics.regex": "br_auths__(.*)",
"transforms": "dropTopicPrefix,unwrap",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"br_auths__(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"insert.mode": "upsert",
"pk.fields": "Id",
"pk.mode": "record_value"
}
}
The Avro message transforms into json like this:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "Id"
},
{
"type": "string",
"optional": false,
"field": "UserId"
},
{
"type": "string",
"optional": false,
"field": "RoleId"
},
{
"type": "string",
"optional": true,
"field": "APPID"
},
{
"type": "int32",
"optional": false,
"default": 0,
"field": "IsDeleted"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "ctime"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "utime"
}
],
"optional": true,
"name": "local3910472223.br_auths.auths_user_roles.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "Id"
},
{
"type": "string",
"optional": false,
"field": "UserId"
},
{
"type": "string",
"optional": false,
"field": "RoleId"
},
{
"type": "string",
"optional": true,
"field": "APPID"
},
{
"type": "int32",
"optional": false,
"default": 0,
"field": "IsDeleted"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "ctime"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "utime"
}
],
"optional": true,
"name": "local3910472223.br_auths.auths_user_roles.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "local3910472223.br_auths.auths_user_roles.Envelope"
},
"payload": {
"before": null,
"after": {
"Id": "DB4DA841364860D112C3C76BDCB36635",
"UserId": "0000000000",
"RoleId": "5b7e5f9b4bc00d89c4cf96ae",
"APPID": "br.region2",
"IsDeleted": 0,
"ctime": 1550138524000,
"utime": 1550138524000
},
"source": {
"version": "0.8.3.Final",
"name": "local3910472223",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 64606,
"row": 0,
"snapshot": true,
"thread": null,
"db": "br_auths",
"table": "auths_user_roles",
"query": null
},
"op": "c",
"ts_ms": 1550568556614
}
}
The columns using MySQL datetime type were serialized to a big integer, the JDBC sink connector tried to insert into MySQL datetime columns and it failed.
So I wrote transforms.TimestampConverter
in the source connect config, but the ctime, utime columns didn't change. What's wrong?