2
votes

Using the latest kafka and confluent jdbc sink connectors. Sending a really simple Json message:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "msg"
            }
        ],
        "optional": false,
        "name": "msgschema"
    },
    "payload": {
        "id": 222,
        "msg": "hi"
    }
}

But getting error:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Jsonlint says the Json is valid. I have kept json schemas.enable=true in kafka configuration. Any pointers?

2
Can you post your connector and Connect worker config pleaseRobin Moffatt
I am starting kafka connect using the command: ./bin/connect-standalone.sh config/connect-standalone.properties config/sink-mysql.propertiesprabhas
connect-standalone.properties: bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000prabhas
sink-mysql.properties: name=test-sink-mysql-jdbc connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=2 connection.url=jdbc:mysql://localhost:3306/testdb connection.user=testuser connection.password=testuser pk.mode=record_value pk.fields=id insert.mode=upsert auto.create=true auto.evolve=false topics=mynewtopic offset.storage.file.filename=/tmp/mysql-sink.offsetsprabhas
I am not using schema registry or avro. I have added Confluent jdbc connector jar and mysql jdbc jar in the classpathprabhas

2 Answers

5
votes

You need to tell Connect that your schema is embedded in the JSON you're using.

You have:

value.converter=org.apache.kafka.connect.json.JsonConverter 

But need also:

value.converter.schemas.enable=true
2
votes

In order to use the JDBC sink, your streamed messages must have a schema. This can be achieved either by using Avro with Schema Registry, or by using JSON with schemas. You might need to delete the topic, re-run sink and then start source side once again if schemas.enable=true has been configured after initially running the source properties file.

Example:

sink.properties file

name=sink-mysql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=test-mysql-jdbc-foobar
connection.url=jdbc:mysql://127.0.0.1:3306/demo?user=user1&password=user1pass
auto.create=true

and an example worker configuration file connect-avro-standalone.properties:

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

plugin.path=share/java

and execute

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink.properties