3
votes

I have started exploring Kafka and Kafka connect recently and did some initial set up . But wanted to explore more on schema registry part .

My schema registry is started now what i should do .

I have a AVRO schema stored in avro_schema.avsc.

here is the schema

{
  "name": "FSP-AUDIT-EVENT",
  "type": "record",
  "namespace": "com.acme.avro",
  "fields": [
    {
      "name": "ID",
      "type": "string"
    },
    {
      "name": "VERSION",
      "type": "int"
    },
    {
      "name": "ACTION_TYPE",
      "type": "string"
    },
    {
      "name": "EVENT_TYPE",
      "type": "string"
    },
    {
      "name": "CLIENT_ID",
      "type": "string"
    },
    {
      "name": "DETAILS",
      "type": "string"
    },
    {
      "name": "OBJECT_TYPE",
      "type": "string"
    },
    {
      "name": "UTC_DATE_TIME",
      "type": "long"
    },
    {
      "name": "POINT_IN_TIME_PRECISION",
      "type": "string"
    },
    {
      "name": "TIME_ZONE",
      "type": "string"
    },
    {
      "name": "TIMELINE_PRECISION",
      "type": "string"
    },
    {
      "name": "AUDIT_EVENT_TO_UTC_DT",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "AUDIT_EVENT_TO_DATE_PITP",
      "type": "string"
    },
    {
      "name": "AUDIT_EVENT_TO_DATE_TZ",
      "type": "string"
    },
    {
      "name": "AUDIT_EVENT_TO_DATE_TP",
      "type": "string"
    },
    {
      "name": "GROUP_ID",
      "type": "string"
    },
    {
      "name": "OBJECT_DISPLAY_NAME",
      "type": "string"
    },
    {
      "name": "OBJECT_ID",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "USER_DISPLAY_NAME",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "USER_ID",
      "type": "string"
    },
    {
      "name": "PARENT_EVENT_ID",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "NOTES",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "SUMMARY",
      "type": [
        "string",
        "null"
      ]
    }
  ]
}

Is my schema is valid .I converted it online from JSON ? where should i keep this schema file location i am not sure about . Please guide me with the step to follow . I am sending records from Lambda function and from JDBC source both .

So basically how can i enforce AVRO schema and test ? Do i have to change anything in avro-consumer properties file ?

Or is this correct way to register schema

   ./bin/kafka-avro-console-producer \
                 --broker-list b-3.**:9092,b-**:9092,b-**:9092 --topic AVRO-AUDIT_EVENT \
                 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'





curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json"     --data '{"schema" : "{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"VERSION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"ACTION_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"EVENT_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"CLIENT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"DETAILS\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"UTC_DATE_TIME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"POINT_IN_TIME_PRECISION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"TIME_ZONE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"TIMELINE_PRECISION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"GROUP_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_DISPLAY_NAME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"USER_DISPLAY_NAME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"USER_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"PARENT_EVENT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"NOTES\"},{\"type\":\"string\",\"optional\":true,\"field\":\"SUMMARY\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_UTC_DT\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_PITP\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_TZ\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_TP\"}],\"optional\":false,\"name\":\"test\"}"}' http://localhost:8081/subjects/view/versions

what next i have to do

But when i try to see my schema i get only below

curl --silent -X GET http://localhost:8081/subjects/AVRO-AUDIT-EVENT/versions/latest

this is the result

{"subject":"AVRO-AUDIT-EVENT","version":1,"id":161,"schema":"{\"type\":\"string\",\"optional\":false}"}

Why i do not see my full registered schema

Also when i try to delete schema

i get below error

{"error_code":405,"message":"HTTP 405 Method Not Allowed"

i am not sure if my schema is registered correctly .

Please help me. Thanks in Advance

1

1 Answers

2
votes

is my schema valid

You can use the REST API of the Registry to try and submit it and see...

where should i keep this schema file location i am not sure about

It's not clear how you're sending messages...

If you actually wrote Kafka producer code, you store it within your code (as a string) or as a resource file.. If using Java, you can instead use the SchemaBuilder class to create the Schema object

You need to rewrite your producer to use Avro Schema and Serializers if you've not already

If we create AVRO schema will it work for Json as well .

Avro is a Binary format, but there is a JSONDecoder for it.

what should be the URL for our AVRO schema properties file ?

It needs to be the IP of your Schema Registry once you figure out how to start it. (with schema-registry-start)

Do i have to change anything in avro-consumer properties file ?

You need to use the Avro Deserializer

is this correct way to register schema

.> /bin/kafka-avro-console-producer \

Not quite. That's how you produce a message with a schema (and you need to use the correct schema). You also must provide --property schema.registry.url

You use the REST API of the Registry to register and verify schemas