0
votes

I want to create a stream from kafka topic that monitor a mysql table. mysql table has columns with decimal(16,4) type and when I create stream with this command: create stream test with (KAFKA_TOPIC='dbServer.Kafka.DailyUdr',VALUE_FORMAT='AVRO'); stream created and run but columns with decimal(16,4) type don't appear in result stream.


    source topic value schema:
    {
      "type": "record",
      "name": "Envelope",
      "namespace": "dbServer.Kafka.DailyUdr",
      "fields": [
        {
          "name": "before",
          "type": [
            "null",
            {
              "type": "record",
              "name": "Value",
              "fields": [
                {
                  "name": "UserId",
                  "type": "int"
                },
                {
                  "name": "NationalCode",
                  "type": "string"
                },
                {
                  "name": "TotalInputOcted",
                  "type": "int"
                },
                {
                  "name": "TotalOutputOcted",
                  "type": "int"
                },
                {
                  "name": "Date",
                  "type": "string"
                },
                {
                  "name": "Service",
                  "type": "string"
                },
                {
                  "name": "decimalCol",
                  "type": [
                    "null",
                    {
                      "type": "bytes",
                      "scale": 4,
                      "precision": 16,
                      "connect.version": 1,
                      "connect.parameters": {
                        "scale": "4",
                        "connect.decimal.precision": "16"
                      },
                      "connect.name": "org.apache.kafka.connect.data.Decimal",
                      "logicalType": "decimal"
                    }
                  ],
                  "default": null
                }
              ],
              "connect.name": "dbServer.Kafka.DailyUdr.Value"
            }
          ],
          "default": null
        },
        {
          "name": "after",
          "type": [
            "null",
            "Value"
          ],
          "default": null
        },
        {
          "name": "source",
          "type": {
            "type": "record",
            "name": "Source",
            "namespace": "io.debezium.connector.mysql",
            "fields": [
              {
                "name": "version",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "connector",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "server_id",
                "type": "long"
              },
              {
                "name": "ts_sec",
                "type": "long"
              },
              {
                "name": "gtid",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "file",
                "type": "string"
              },
              {
                "name": "pos",
                "type": "long"
              },
              {
                "name": "row",
                "type": "int"
              },
              {
                "name": "snapshot",
                "type": [
                  {
                    "type": "boolean",
                    "connect.default": false
                  },
                  "null"
                ],
                "default": false
              },
              {
                "name": "thread",
                "type": [
                  "null",
                  "long"
                ],
                "default": null
              },
              {
                "name": "db",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "table",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              },
              {
                "name": "query",
                "type": [
                  "null",
                  "string"
                ],
                "default": null
              }
            ],
            "connect.name": "io.debezium.connector.mysql.Source"
          }
        },
        {
          "name": "op",
          "type": "string"
        },
        {
          "name": "ts_ms",
          "type": [
            "null",
            "long"
          ],
          "default": null
        }
      ],
      "connect.name": "dbServer.Kafka.DailyUdr.Envelope"
    }

my problem is in decimalCol column

1

1 Answers

0
votes

KSQL does not yet support DECIMAL data type.

There is an issue here that you can track and upvote if you think it would be useful.