I've created Debezium connector in Kafka Connect. It works, but if I want to add any SMT, it breaks - there is different structure of message for DDL changes (for example changes in table schema) and different for changes in rows.
{
"name": "test_debezium",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": localhost",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "***",
"database.server.name": "test-debezium",
"database.include.list": "database",
"table.include.list": "database.table",
"database.history.kafka.topic": "test_debezium_history",
"database.history.kafka.recovery.poll.interval.ms": 5000,
"database.history.kafka.bootstrap.servers": "localhost:9092",
"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "client_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "client_id"
}
}
How to separate database.history
messages and database.table
topic, about changes in rows?
example of DDL message:
{"databaseName":"database"}-{"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":null,"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"databaseName":"database","ddl":"CREATE DATABASE IF EXISTS `database`"}
example of row
message:
{"client_id":328}-{"before":null,"after":{"test-debezium.database.table.Value":{"client_id":328,"first_name":"Ignacy","uuid":"000"}},"source":{"version":"1.5.0-SNAPSHOT","connector":"mysql","name":"test-debezium","ts_ms":0,"snapshot":{"string":"true"},"db":"database","table":{"string":"table"},"server_id":0,"gtid":null,"file":"mysql-bin.000","pos":000,"row":0,"thread":null,"query":null},"op":"c","ts_ms":{"long":1611301059407},"transaction":null}
so the same extractor for Key doesn't work - how to use different extractor for different topics or how to use extractor just in row
messages?
Thanks!