4
votes

I'm trying to send a message to my broker, using Avro schema, but "im always getting error:

2020-02-01 11:24:37.189 [nioEventLoopGroup-4-1] ERROR Application - Unhandled: POST - /api/orchestration/ org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string" Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409

Here my docker container:

 connect:
    image: confluentinc/cp-kafka-connect:5.4.0
    hostname: confluentinc-connect
    container_name: confluentinc-connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: confluentinc-connect
      CONNECT_CONFIG_STORAGE_TOPIC: confluentinc-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: confluentinc-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: confluentinc-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/extras"

My producer (written in Kolin)

 val prop: HashMap<String, Any> = HashMap()
    prop[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
    prop[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
    prop[VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
    prop[SCHEMA_REGISTRY_URL] = schemaUrl
    prop[ENABLE_IDEMPOTENCE_CONFIG] = idempotence
    prop[ACKS_CONFIG] = acks.value
    prop[RETRIES_CONFIG] = retries
    prop[MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION] = requestPerConnection
    prop[COMPRESSION_TYPE_CONFIG] = compression.value
    prop[LINGER_MS_CONFIG] = linger
    prop[BATCH_SIZE_CONFIG] = batchSize.value

    return KafkaProducer(prop)

My Avro Schema:

{
    "type": "record",
    "namespace": "com.rjdesenvolvimento",
    "name": "create_client_value",
    "doc": "Avro Schema for Kafka Command",
    "fields": [
        {
            "name": "id",
            "type": "string",
            "logicalType": "uuid",
            "doc": "UUID for indentifaction command"
        },
        {
            "name": "status",
            "type": {
                "name": "status",
                "type": "enum",
                "symbols": [
                    "Open",
                    "Closed",
                    "Processing"
                ],
                "doc": "Can be only: Open, Closed or Processing"
            },
            "doc": "Status of the command"
        },
        {
            "name": "message",
            "type": {
                "type": "record",
                "name": "message",
                "doc": "Avro Schema for insert new client",
                "fields": [
                    {
                        "name": "id",
                        "type": "string",
                        "logicalType": "uuid",
                        "doc": "UUID for indentifaction client transaction"
                    },
                    {
                        "name": "active",
                        "type": "boolean",
                        "doc": "Soft delete for client"
                    },
                    {
                        "name": "name",
                        "type": "string",
                        "doc": "Name of the client"
                    },
                    {
                        "name": "email",
                        "type": "string",
                        "doc": "Email of the client"
                    },
                    {
                        "name": "document",
                        "type": "string",
                        "doc": "CPF or CPNJ of the client"
                    },
                    {
                        "name": "phones",
                        "doc": "A list of phone numbers",
                        "type": {
                            "type": "array",
                            "items": {
                                "name": "phones",
                                "type": "record",
                                "fields": [
                                    {
                                        "name": "id",
                                        "type": "string",
                                        "logicalType": "uuid",
                                        "doc": "UUID for indentifaction of phone transaction"
                                    },
                                    {
                                        "name": "active",
                                        "type": "boolean",
                                        "doc": "Soft delete for phone number"
                                    },
                                    {
                                        "name": "number",
                                        "type": "string",
                                        "doc": "The phone number with this regex +xx xx xxxx xxxx"
                                    }
                                ]
                            }
                        }
                    },
                    {
                        "name": "address",
                        "type": "string",
                        "logicalType": "uuid",
                        "doc": "Adrres is an UUID for a other address-microservice"
                    }
                ]
            }
        }
    ]
}

And my post:

{       
      "id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
        "status" : "Open",
        "message": {
            "id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
            "active" : true,
             "name": "name",
             "email": "email@com",
             "document": "document",
             "phones": [
                 {
                     "id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
                        "active" : true,
                     "number": "+xx xx xxxx xxxx"
                 },
                    {
                     "id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
                        "active" : true,
                     "number": "+xx xx xxxx xxxx"
                 }
             ],
             "address": "9ec818da-6ee0-4634-9ed8-c085248cae12"  
        }   
}

What am I doing wrong? github project: https://github.com/rodrigodevelms/kafka-registry

UPDATE =====

Briefly: I'm not generating my classes using the Gradle Avro plugin. In this example, my POST sends an Client object. And in service, it assembles a Command-type object as follows:

id: same client id

status: open

message: the POST that was sent.

So I send this to KAFKA, and in the connect (jdbc sink postgres) I put as fields.whitelist only the attributes of the message (the client) and I don't get either the command id or the status.

on github the only classes that matter to understand the code are:

1 -https://github.com/rodrigodevelms/kafka-registry/blob/master/kafka/src/main/kotlin/com/rjdesenvolvimento/messagebroker/producer/Producer.kt

2 - https://github.com/rodrigodevelms/kafka-registry/blob/master/kafka/src/main/kotlin/com/rjdesenvolvimento/messagebroker/commnad/Command.kt

3 - https://github.com/rodrigodevelms/kafka-registry/blob/master/src/client/Controller.kt

4 -https://github.com/rodrigodevelms/kafka-registry/blob/master/src/client/Service.kt

5 - docker-compose.yml, insert-client-value.avsc, postgresql.json,

if i set the compatibility mode of the avro scheme to "none", i can send a message, but some unknown characters will be shown, as shown in the photo below.

enter image description here

1
Note: CONNECT_ZOOKEEPER_CONNECT is not a valid propertyOneCricketeer
And you duplicated the STORAGE_REPLICATION_FACTOR properties and CONVERTER_SCHEMAS_ENABLE is not valid for any converters except JSONOneCricketeer
Thanks for the tips, but the error persists.Rodrigo Batista
Those weren't solutions to your problem, just suggestions to clean up the configOneCricketeer
I've been trying to find the error for almost a whole day, but I couldn't. If possible, could you look at the github repository? I appreciate the help attempt. =)Rodrigo Batista

1 Answers

6
votes

I suspect that you're trying to do multiple things and you've not been cleaning up state after previous attempts. You should not get that error in a fresh installation

Schema being registered is incompatible with an earlier schema

Your data has changed in a way that the schema in the registry is not compatible with the one you're sending.

You can send an HTTP DELETE request to http://registry:8081/subjects/[name]/ to delete all versions of the schema, then you can restart your connector