0
votes

I'm trying to use the Confluent InfluxDB sink connector to get data from a topic into my InfluxDB. The configuration looks like this:

connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=https://mydb
topics=mytopic
tasks.max=1

All I get when I create the new connector through the Kafka Connect UI is the following exception:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct
    at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    ... 10 more

The values in the topic are json strings like this: {"pid":1,"filename":"test1.csv"}. Is there any configuration I'm missing here?

Update: Here is my worker configuration:

config.storage.topic=kafka-connect-my-config
rest.port=28082
group.id=kafka-connect-mygroup
plugin.path=/usr/share/java,/connect-plugins
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.topic=kafka-connect-my-offsets
bootstrap.servers={my broker urls}
value.converter=org.apache.kafka.connect.storage.StringConverter
status.storage.topic=kafka-connect-my-status
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.advertised.host.name=kafka-development-kafka-connect-1
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
1
Can you share your worker properties file too - Robin Moffatt

1 Answers

0
votes

The InfluxDB connector requires a schema to be present in the data, so if you have JSON data you need to set

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

BUT your JSON needs to include the schema, so instead of

{"pid":1,"filename":"test1.csv"}

you would need something like

{
    "schema": {
        "type": "struct", "optional": false, "version": 1, "fields": [
            { "field": "pid", "type": "string", "optional": true },
            { "field": "filename", "type": "string", "optional": true }
        ]
    },
    "payload": {
        "pid": 1,
        "filename": "test1.csv"
    }
}

Ref: https://rmoff.net/2020/01/23/notes-on-getting-data-into-influxdb-from-kafka-with-kafka-connect/

For more details of how to apply the schema to your data see this blog

For more details on converters in general see this article.