0
votes

I'm using an Avro Value transform that generates schema like the following (this is just a subset since its so large)

{
  "type": "record",
  "name": "Envelope",
  "namespace": "mssql.dbo.InvTR_T",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "InvTR_ID",
              "type": "int"
            },
            {
              "name": "Type_CH",
              "type": "string"
            },
            {
              "name": "CalcType_CH",
              "type": "string"
            },
            {
              "name": "ER_CST_ID",
              "type": "int"
            },
            {
              "name": "ER_REQ_ID",
              "type": "int"
            },
            {
              "name": "Vendor_ID",
              "type": "int"
            },
            {
              "name": "VendInv_VC",
              "type": "string"
            },
            {
              "name": "Status_CH",
              "type": "string"
            },
            {
              "name": "Stage_TI",
              "type": {
                "type": "int",
                "connect.type": "int16"
              }
            },
            {
              "name": "CheckOut_ID",
              "type": [
                "null",
                "int"
              ],
              "default": null
            },
            {
              "name": "ReCalcCk_LG",
              "type": "boolean"
            },
            {
              "name": "ReCalcAll_LG",
              "type": "boolean"
            },
            {
              "name": "PatMatch_LG",
              "type": "boolean"
            },
            {
              "name": "DocPatOvRd_LG",
              "type": "boolean"
            },
            {
              "name": "Locked_LG",
              "type": [
                "null",
                "boolean"
              ],
              "default": null
            },
            {
              "name": "SegErrFlag_LG",
              "type": "boolean"
            },
            {
              "name": "Hold_LG",
              "type": "boolean"
            },
            {
              "name": "Reason_ID",
              "type": [
                "null",
                {
                  "type": "int",
                  "connect.type": "int16"
                }
              ],
              "default": null
            },
            {
              "name": "HoldCom_VC",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "AllSegFin_LG",
              "type": "boolean"
            },
            {
              "name": "InvAmt_MN",
              "type": {
                "type": "bytes",
                "scale": 4,
                "precision": 19,
                "connect.version": 1,
                "connect.parameters": {
                  "scale": "4",
                  "connect.decimal.precision": "19"
                },
                "connect.name": "org.apache.kafka.connect.data.Decimal",
                "logicalType": "decimal"
              }

When I run the following to create a stream off this

CREATE STREAM stream_invtr_t_json   WITH (KAFKA_TOPIC='InvTR_T', VALUE_FORMAT='AVRO');

and then I describe that stream, the schema is in a very strange format. I'm wanting to use KSQL in order to filter out specific information and disperse those events appropriately. However I'm not able to go from Kafka Topic => KSQL Stream => Kafka Topic => Sink. If I then create a new topic off that stream, and try to digest it into a Sink I get

Expected Envelope for transformation, passing it unchanged

and then an error about the PK missing. I tried to remove the unwrap transformation just to see how it would come out and received errors on that too.

BEFORE  | STRUCT<INVTR_ID INTEGER, TYPE_CH VARCHAR(STRING), CALCTYPE_CH VARCHAR(STRING), ER_CST_ID INTEGER, ER_REQ_ID INTEGER, VENDOR_ID INTEGER, VENDINV_VC VARCHAR(STRING), STATUS_CH VARCHAR(STRING), STAGE_TI INTEGER, CHECKOUT_ID INTEGER, RECALCCK_LG BOOLEAN, RECALCALL_LG BOOLEAN, PATMATCH_LG BOOLEAN, DOCPATOVRD_LG BOOLEAN, LOCKED_LG BOOLEAN, SEGERRFLAG_LG BOOLEAN, HOLD_LG BOOLEAN, REASON_ID INTEGER, HOLDCOM_VC VARCHAR(STRING), ALLSEGFIN_LG BOOLEAN, INVDATE_DT BIGINT, SHIPDATE_DT BIGINT, PDTERMS_CH VARCHAR(STRING), PMTDUE_DT BIGINT, PMTTERMS_VC VARCHAR(STRING), BILLTERMS_CH VARCHAR(STRING), JOINT_LG BOOLEAN, COMMENT_VC VARCHAR(STRING), SOURCE_CH VARCHAR(STRING), ADDBY_ID VARCHAR(STRING), ADDED_DT BIGINT, CHGBY_ID VARCHAR(STRING), CHGED_DT BIGINT, APPROVED_LG BOOLEAN, MULTIPO_VC VARCHAR(STRING), PRVAUDITED_INVTR_ID INTEGER, PRVVENDOR_ID INTEGER, TRANSITDAYS_SI INTEGER, SHIP_NUM_VC VARCHAR(STRING), PRVTRANSITDAYS_SI INTEGER, PRVJOINT_LG BOOLEAN, CLONEDFROM_INVTR_ID INTEGER, LASTCALC_DT BIGINT, TMSFMANUAL_LG BOOLEAN, FRTRATERSOURCE_CH VARCHAR(STRING), ACTPICKUP_DT BIGINT, ROUTVEND_SI INTEGER, CALCVRSN_TI INTEGER, VENDORRANK_SI INTEGER, SEQ_SI INTEGER, PRVAUDITED_DT BIGINT, FRTRATERBATCHTYPE_CH VARCHAR(STRING), CURRENCY_TYPE_CD VARCHAR(STRING), EXCHANGE_DT BIGINT, EXCHANGE_RATE_LOCKED_LG BOOLEAN, EXCHANGE_DT_LOCKED_LG BOOLEAN, CUSTAPPROVED_LG BOOLEAN, FRTRATERMATCH_INVTR_ID INTEGER, CRC_INVOICE_LG BOOLEAN, RG_ROUTVEND_SI INTEGER, RG_PRVVE
1
Hi, IMHO you should apply UnwrapFromEnvelope SMT on the topic from which you want to create KSQL stream. Also are you sure that the data in topic are in Avro format?Jiri Pechanec
Yes, when I print that topic from KSQL it displays it as Avro, and the schema in the OP was built automatically. Unpwrapping it first makes total sense, so I'd have to create a connector that just sinks it back to another topic unwrapped, then use KSQL on that topic?Jeff Beagley
You can apply it directly at source connector so the topic will contain unwrapped version. I've also check KSQL docs and they are supporting STRUCT datatype. Would not it be possible to use this one? So your CREATE STREAM will contains string fields op and ts_ms and STRUCT fields before, after and source?Jiri Pechanec
@JiriPechanec Yes that does indeed work, that last schema I posted above is auto generated from CREATE STREAM on a topic with CDC data in it. Whats weird that I just noticed is if you compare the Avro schema to what comes in KSQL, its dropped all my decimal columns.Jeff Beagley
- please take a look at decimal.handling.mode. It might help to change it.Jiri Pechanec

1 Answers

0
votes

Seems like the comments about UnwrapFromEnvelope solve part of your problem. That only leaves the part about decimals not coming through.

Looking at the docs for the connector: https://debezium.io/documentation/reference/1.1/connectors/postgresql.html

I can see there is a decimal.handling.mode setting, as Jiri commented. At its default value of precise it looks like it will output an Avro decimal in a format that ksqlDB would recognise unless the the source NUMERIC or DECIMAL types are used without any scale. At which point you end up with the STRUCT data structure, which includes a BYTE field.

There is an exception to this rule. When the NUMERIC or DECIMAL types are used without any scale constraints then it means that the values coming from the database have a different (variable) scale for each value. In this case a type io.debezium.data.VariableScaleDecimal is used and it contains both value and scale of the transferred value.

So to import the data into ksqlDB you going to need to:

  1. Wait until we support a BYTES data type, (which is not on our roadmap at present)
  2. Change the schema of the source table to define the scale of the column(s).
  3. change decimal.handling.mode to some other setting. You can probably use string and then CAST the value to a decimal in ksql.