0
votes

I was following tutorial on kafka connect, and I am wondering if there is a possibility to define a custom schema registry for a topic which data came from a MySql table.

I can't find where define it in my json/connect config and I don't want to create a new version of that schema after creating it.

My MySql table called stations has this schema

Field          | Type        
---------------+-------------
code           | varchar(4)  
date_measuring | timestamp   
attributes     | varchar(256)

where the attributes contains a Json data and not a String (I have to use that type because the Json field of the attributes are variable.

My connector is

{
  "value.converter.schema.registry.url": "http://localhost:8081",
  "_comment": "The Kafka topic will be made up of this prefix, plus the table name  ",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "name": "jdbc_source_mysql_stations",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "transforms": [
    "ValueToKey"
  ],
  "transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.ValueToKey.fields": [
    "code",
    "date_measuring"
  ],
  "connection.url": "jdbc:mysql://localhost:3306/db_name?useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC",
  "connection.user": "confluent",
  "connection.password": "**************",
  "table.whitelist": [
    "stations"
  ],
  "mode": "timestamp",
  "timestamp.column.name": [
    "date_measuring"
  ],
  "validate.non.null": "false",
  "topic.prefix": "mysql-"
}

and creates this schema

{
  "subject": "mysql-stations-value",
  "version": 1,
  "id": 23,
  "schema": "{\"type\":\"record\",\"name\":\"stations\",\"fields\":[{\"name\":\"code\",\"type\":\"string\"},{\"name\":\"date_measuring\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"attributes\",\"type\":\"string\"}],\"connect.name\":\"stations\"}"
}

Where "attributes" field is of course a String. Unlike I would apply it this other schema.

    {
  "fields": [
    {
      "name": "code",
      "type": "string"
    },
    {
      "name": "date_measuring",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "connect.version": 1,
        "logicalType": "timestamp-millis",
        "type": "long"
      }
    },
    {
      "name": "attributes",
      "type": {
        "type": "record",
        "name": "AttributesRecord",
        "fields": [
          {
            "name": "H1",
            "type": "long",
            "default": 0
          },
          {
            "name": "H2",
            "type": "long",
            "default": 0
          },
          {
            "name": "H3",
            "type": "long",
            "default": 0
          },          
          {
            "name": "H",
            "type": "long",
            "default": 0
          },          
          {
            "name": "Q",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P1",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P2",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P3",
            "type": "long",
            "default": 0
          },                    
          {
            "name": "P",
            "type": "long",
            "default": 0
          },          
          {
            "name": "T",
            "type": "long",
            "default": 0
          },          
          {
            "name": "Hr",
            "type": "long",
            "default": 0
          },          
          {
            "name": "pH",
            "type": "long",
            "default": 0
          },          
          {
            "name": "RX",
            "type": "long",
            "default": 0
          },          
          {
            "name": "Ta",
            "type": "long",
            "default": 0
          },  
          {
            "name": "C",
            "type": "long",
            "default": 0
          },                  
          {
            "name": "OD",
            "type": "long",
            "default": 0
          },          
          {
            "name": "TU",
            "type": "long",
            "default": 0
          },          
          {
            "name": "MO",
            "type": "long",
            "default": 0
          },          
          {
            "name": "AM",
            "type": "long",
            "default": 0
          },          
          {
            "name": "N03",
            "type": "long",
            "default": 0
          },          
          {
            "name": "P04",
            "type": "long",
            "default": 0
          },          
          {
            "name": "SS",
            "type": "long",
            "default": 0
          },          
          {
            "name": "PT",
            "type": "long",
            "default": 0
          }          
        ]
       }
     }    
  ],
  "name": "stations",
  "namespace": "com.mycorp.mynamespace",
  "type": "record"
}

Any suggestion please? In case it's not possible, I suppose I'll have to create a KafkaStream to create another topic, even if I would avoid it.

Thanks in advance!

1

1 Answers

1
votes

I don't think you're asking anything about using a "custom" registry (which you'd do with the two lines that say which registry you're using), but rather how you can parse the data / apply a schema after the record is pulled from the database

You can write your own Transform, or you can use Kstreams, which are really the main options here. There is a SetSchemaMetadata transform, but I'm not sure that'll do what you want (parse a string into an Avro record)

Or if you must shove JSON data into a single database attribute, maybe you shouldn't use Mysql and rather a document database which has more flexible data constraints.

Otherwise, you can use BLOB rather than varchar and put binary Avro data into that column, but then you'd still need a custom deserializer to read the data