0
votes

I want to send the data sent to the topic to a postgresql-database. So I follow this guide and have configured the properties-file like this:

name=transaction-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=transactions
connection.url=jdbc:postgresql://localhost:5432/db
connection.user=db-user
connection.password=
auto.create=true
insert.mode=insert
table.name.format=transaction
pk.mode=none

I start the connector with

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink-quickstart-postgresql.properties

The sink-connector is created but does not start due to this error:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

The schema is in avro-format and registered and I can send (produce) messages to the topic and read (consume) from it. But I can't seem to sent it to the database.

This is my ./etc/schema-registry/connect-avro-standalone.properties

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

This is a producer feeding the topic using the java-api:

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

try (KafkaProducer<String, Transaction> producer = new KafkaProducer<>(properties)) {
    Transaction transaction = new Transaction();
    transaction.setFoo("foo");
    transaction.setBar("bar");
    UUID uuid = UUID.randomUUID();
    final ProducerRecord<String, Transaction> record = new ProducerRecord<>(TOPIC, uuid.toString(), transaction);
    producer.send(record);
}

I'm verifying data is properly serialized and deserialized using

./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic transactions \
    --from-beginning --max-messages 1

The database is up and running.

1
What's your KEY_SERIALIZER_CLASS_CONFIG set to for the Producer?Robin Moffatt
Updated my question. StringSerializer.class.kometen
Bingo :) I've updated my answer.Robin Moffatt
Awesome you are. :-) It works. I must transform the message since I use BigDecimal which in avro is type string, java-class java.math.BigDecimal to circumvent the STRUCT type doesn't have a mapping to SQL error. There are links I'll look at.kometen
After I changed key.converter I have tried to make the topic sink data to the postgresql-table. A seemingly straightforward task since I don't have nested data. But it stops with the error "Unsupported source data type: STRUCT". I have looked at docs.confluent.io/3.1.1/connect/connect-jdbc/docs/… and stackoverflow.com/questions/44385722/… but the latter seems unrelated. The properties are at the top. Can I edit with the error?kometen

1 Answers

2
votes

This is not correct:

The unknown magic byte can be due to a id-field not part of the schema

What that error means that the message on the topic was not serialised using the Schema Registry Avro serialiser.

How are you putting data on the topic?

Maybe all the messages have the problem, maybe only some—but by default this will halt the Kafka Connect task.

You can set

"errors.tolerance":"all",

to get it to ignore messages that it can't deserialise. But if all of them are not correctly Avro serialised this won't help and you need to serialise them correctly, or choose a different Converter (e.g. if they're actually JSON, use the JSONConverter).

These references should help you more:


Edit :

If you are serialising the key with StringSerializer then you need to use this in your Connect config:

key.converter=org.apache.kafka.connect.storage.StringConverter

You can set it at the worker (global property, applies to all connectors that you run on it), or just for this connector (i.e. put it in the connector properties itself, it will override the worker settings)