0
votes

I'm using the mongo DB sink connector and trying to use the Flatten SMT. My Kafka message looks like this:

{
    "entity": {
        "_id": "62ade225a28f13001127297a",
        "status": "Critical",
        "severity": "Critical",
        "is_active": true,
        "source_system": "api.test",
        "tags": [
            {
                "type": "host",
                "value": "host123"
            },
            {
                "type": "check",
                "value": "check123"
            }
        ]
    }
}

My main goal is to flatten the entity object in order to keep only a few fields, for example:

{
    "_id": "62ade225a28f13001127297a",
    "status": "Critical",
    "severity": "Critical",
    "is_active": true
}

My configurations are:

{
    "name": "kafka-to-mongo",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "tasks.max": "1",
        "topics.regex": "test_topic",
        "connection.uri": "mongodb://mongodb4:27017/auto-resolve",
        "database": "auto-resolve",
        "value.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "collection": "autoresolve_schedules",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
        "document.id.strategy.partial.value.projection.list": "entity._id",
        "document.id.strategy.partial.value.projection.type": "AllowList",
        "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
        "transforms": "HoistField,ReplaceField,flatten",
        "transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Key",
        "transforms.HoistField.field": "key",
        "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.ReplaceField.whitelist": "entity",
        "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
        "transforms.flatten.delimiter": "."
    }
} 

I'm getting this error:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Flatten transformation does not support class java.util.ArrayList for record without schemas (for field entity.tags).
    at org.apache.kafka.connect.transforms.Flatten.applySchemaless(Flatten.java:134)
    at org.apache.kafka.connect.transforms.Flatten.applySchemaless(Flatten.java:131)
    at org.apache.kafka.connect.transforms.Flatten.applySchemaless(Flatten.java:99)
    at org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:75)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 14 more

What am I doing wrong? In addition, I wanted to investigate it a little bit and see why I get this error so I cloned the Flatten class from the Kafka repo, I added a test that takes my message (json string) and converts it with the Jackson object mapper. Then, use the apply method of the Flatten class. Superinsgly, I didn't get any exceptions and the transformer worked as expected (in the Junit test).

Do you have a schema for this topic?Gibbs
No @Gibbs. Why?oy121