1
votes

Is there a way to create a stream from a topic specifying that the entire record should be considered a VARCHAR so that I can create streams from it using extractjsonfield()? Sample records might looks something like:

{
  "Header": {
    "RecType": "RecA",
    ... more header records in a fairly consistent format ...
  },
  "RAFld1": {
    "someFld": "some data",
    "someOtherField": 1.001,
  },
  "RAFld2": {
    "aFld": "data",
    "anotherFld": 98.6,
    ...
  },
  ...
}

But the next record might look like:

{
  "Header": {
    "RecType": "RecB",
    ... more header records in a fairly consistent format ...
  },
  "RBFld1": {
    "randomFld": "random data",
    "randomOtherField": 1.001,
    ...
  }
}

I can work out how to define the initial stream with known fields as type VARCHAR and then extractjsonfield() (with an appropriate where clause), but don't see a way to say that the top level structure does not have consistently named fields.

This is the way my input topic is formatted; I can't change that format. I was hopeful that KSQL was going to be an elegant solution, but I looks like I'm stuck right from the start by not being able to handle this dynamic structure.

1

1 Answers

2
votes

It doesn't matter if you name fields in your schema that are not present in every message; you'll just get null values.

I thought your question was an interesting one and have written up an explanation of how KSQL can work here - let me know if there's something else you want to do with it and I can expand on the answer.


  1. Inspect the raw data:

    ksql> PRINT 'source_data' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}}
    {"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}
    
  2. Register the source_data topic for use as a KSQL Stream called my_stream:

    CREATE STREAM my_stream (Header VARCHAR, \
                             RAFld1 VARCHAR, \
                             RAFld2 VARCHAR, \
                             RBFld1 VARCHAR) \
    WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');
    
  3. Inspect the messages. Note that in the second message (which is record type "B") there is no value for 'RAFld1' and so a null is shown:

    ksql> SELECT Header, RAFld1 FROM my_stream LIMIT 2;
    {"RecType":"RecA"} | {"someOtherField":1.001,"someFld":"some data"}
    {"RecType":"RecB"} | null
    
  4. Populate a new Kafka topic with just record type "A" values, using EXTRACTFROMJSON to filter record types on the Header value, and to extract named fields from the payload:

    CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS \
    SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField, \
            EXTRACTJSONFIELD(RAFld1,'$.someFld')        AS someFld, \
            EXTRACTJSONFIELD(RAFld2,'$.aFld')           AS aFld, \
            EXTRACTJSONFIELD(RAFld2,'$.anotherFld')     AS anotherFld \
            FROM my_stream \
    WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';
    

    Note that the serialisation is being switched to Avro so that the schema is available automatically to any consumer, without having to manually declare it.

  5. Observe the new stream has a schema and is populated continually with messages as they arrive in the original source_data topic:

    ksql> DESCRIBE recA_data;
    
    Name                 : RECA_DATA
    Field          | Type
    --------------------------------------------
    ROWTIME        | BIGINT           (system)
    ROWKEY         | VARCHAR(STRING)  (system)
    SOMEOTHERFIELD | VARCHAR(STRING)
    SOMEFLD        | VARCHAR(STRING)
    AFLD           | VARCHAR(STRING)
    ANOTHERFLD     | VARCHAR(STRING)
    --------------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    
    ksql> SELECT * FROM recA_data;
    1545240188787 | null | 1.001 | some data | data | 98.6