To understand how cdc works, I have been working with the following eaxmple given at debezium site https://debezium.io/blog/2018/03/08/creating-ddd-aggregates-with-debezium-and-kafka-streams/.
In this if i try to change the sink connector from mongo db to elastic search and then start the es-sink connector. Its shows the following error
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
The Mysql Debezium source connector properties are this (Do ignore the correction url)
{
"name": "mysql-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "cdc",
"database.password": "passwrod",
"database.server.id": "1840514",
"database.server.name": "dbserver1",
"table.whitelist": "inventory.customers,inventory.addresses",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "unwrap",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false"
}
}
The elastic search sink connector is what used in this example https://debezium.io/blog/2018/01/17/streaming-to-elasticsearch/
The Elastic sink connector properties are this (Do ignore the correction url)
{
"name": "elastic-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "product-cdc,final_ddd_aggregates,dbserver1.inventory.customers,dbserver1.inventory.addresses",
"connection.url": "https://localhost:9243",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"schema.ignore" : "true",
"value.converter.schemas.enable":"true",
"type.name":"final_ddd_aggregates"
}
}
Please assist me on this.