1
votes

To understand how cdc works, I have been working with the following eaxmple given at debezium site https://debezium.io/blog/2018/03/08/creating-ddd-aggregates-with-debezium-and-kafka-streams/.

In this if i try to change the sink connector from mongo db to elastic search and then start the es-sink connector. Its shows the following error

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

The Mysql Debezium source connector properties are this (Do ignore the correction url)

{
"name": "mysql-source",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "cdc",
    "database.password": "passwrod",
    "database.server.id": "1840514",
    "database.server.name": "dbserver1",
    "table.whitelist": "inventory.customers,inventory.addresses",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "transforms": "unwrap",
    "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.unwrap.drop.tombstones":"false"
}

}

The elastic search sink connector is what used in this example https://debezium.io/blog/2018/01/17/streaming-to-elasticsearch/

The Elastic sink connector properties are this (Do ignore the correction url)

{
"name": "elastic-sink",
"config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "product-cdc,final_ddd_aggregates,dbserver1.inventory.customers,dbserver1.inventory.addresses",
    "connection.url": "https://localhost:9243",
    "transforms": "unwrap,key",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.key.field": "id",
    "key.ignore": "false",
    "schema.ignore" : "true",
    "value.converter.schemas.enable":"true",
    "type.name":"final_ddd_aggregates"
}

}

Please assist me on this.

2

2 Answers

1
votes

In your configuration you need to do as the error message tells you, and set schemas.enable=false. Using the example from the article, instead of:

{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": "1",
        "topics": "final_ddd_aggregates",
        "mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
        "mongodb.collection": "customers_with_addresses",
        "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
        "mongodb.delete.on.null.values": true
    }
}

you would have:

{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": "1",
        "topics": "final_ddd_aggregates",
        "mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
        "mongodb.collection": "customers_with_addresses",
        "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
        "mongodb.delete.on.null.values": true,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false"
    }
}

To understand more about converters etc see https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained and also http://rmoff.dev/ksldn19-kafka-connect

0
votes

as the error message implies you probably have JSON messages without schema stored in the topic you are reading from. You either need to enable it on the source side or disable on the sink side.

Please check this FAQ entry for mor details.