2
votes

I'm using the Kafka connect JDBC Sink Connector to stored data from topics into a SQL Server table. The data needs to be flattened. I've created a SQL Server table and a JSON record based on the example provided by Confluent.

So my record is this one:

{
    "payload":{ 
        "id": 42,
        "name": {
          "first": "David"
        }
    },
    "schema": {
        "fields": [
            {
                "field": "id",
                "optional": true,
                "type": "int32"
            },
            {
                "name": "name",
                "optional": "false",
                "type": "struct",
                "fields": [
                    {
                        "field": "first",
                        "optional": true,
                        "type": "string"
                    }
                ]
            }
        ],
        "name": "Test",
        "optional": false,
        "type": "struct"
    }   
}

As you can see, I want to flatten the fields concatenating the delimiter "_". So my Sink Connector configuration is as follows:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=MyTable
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
topics=myTopic
tasks.max=1
transforms=flatten
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=jdbc:sqlserver:[url]
transforms.flatten.delimiter=_

When I write that record in the topic, I get the following exception:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schema's field name not specified properly
    at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:512)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:360)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more

With records that don't require flattening, the sink connector works fine. Is there anything wrong with the configuration? Is it possible to flatten JSON files with the schema?

P.S. Kafka connect version: 5.3.0-css

Any help would be greatly appreciated.

1

1 Answers

2
votes

Ok, the problem was the field name of the nested field. The correct field name is "field", not "name":

{
    "payload":{ 
        "id": 42,
        "name": {
          "first": "David"
        }
    },
    "schema": {
        "fields": [
            {
                "field": "id",
                "optional": true,
                "type": "int32"
            },
            {
                **"field": "name",**
                "optional": "false",
                "type": "struct",
                "fields": [
                    {
                        "field": "first",
                        "optional": true,
                        "type": "string"
                    }
                ]
            }
        ],
        "name": "Test",
        "optional": false,
        "type": "struct"
    }   
}