Using the latest kafka and confluent jdbc sink connectors. Sending a really simple Json message:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "msg"
}
],
"optional": false,
"name": "msgschema"
},
"payload": {
"id": 222,
"msg": "hi"
}
}
But getting error:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Jsonlint says the Json is valid. I have kept json schemas.enable=true
in kafka configuration. Any pointers?
./bin/connect-standalone.sh config/connect-standalone.properties config/sink-mysql.properties
– prabhasbootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000
– prabhasname=test-sink-mysql-jdbc connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=2 connection.url=jdbc:mysql://localhost:3306/testdb connection.user=testuser connection.password=testuser pk.mode=record_value pk.fields=id insert.mode=upsert auto.create=true auto.evolve=false topics=mynewtopic offset.storage.file.filename=/tmp/mysql-sink.offsets
– prabhas