0
votes

I setup a Kafka JDBC sink to send events to a PostgreSQL. I wrote this simple producer that sends JSON with schema (avro) data to a topic as follows:

producer.py (kafka-python)

biometrics = {
        "heartbeat": self.pulse, # integer
        "oxygen": self.oxygen,# integer
        "temprature": self.temprature, # float
        "time": time # string
    }

avro_value = {
               "schema": open(BASE_DIR+"/biometrics.avsc").read(),
               "payload": biometrics
             }

producer.send("biometrics",
                      key="some_string",
                      value=avro_value
                      )

Value Schema:

{
    "type": "record",
    "name": "biometrics",
    "namespace": "athlete",
    "doc": "athletes biometrics"
    "fields": [
        {
            "name": "heartbeat",
            "type": "int",
            "default": 0
        },
        {
            "name": "oxygen",
            "type": "int",
            "default": 0
        },
        {
            "name": "temprature",
            "type": "float",
            "default": 0.0
        },
        {
            "name": "time",
            "type": "string"
            "default": ""
        }
    ]
}

Connector config (without hosts, passwords etc)

{
    "name": "jdbc_sink",
    "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter ",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "biometrics",
    "insert.mode": "insert",
    "auto.create": "true"
}

But my connector fails hard, with three errors and I am unable to spot the reason for any of them:

TL;DR; log Version

(Error 1) Caused by: org.apache.kafka.connect.errors.DataException: biometrics
(Error 2) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
(Error 3) Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Full log

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: biometrics
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:98)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:498)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Could someone help me understand those errors and the underlying reason?

1

1 Answers

1
votes

The error is because you need to use the JSONConverter class w/ value.converter.schemas.enabled=true in your Connector since that was what was produced, but the schema payload is not an Avro schema represntation of the payload, so it might still fail with those changes alone...

If you want to actually send Avro, then use the AvroProducer in confluent-kafka library, which requires running the Schema Registry.