1
votes

I got two different topics with their own schema in avro format, X and Y. Both of these topics have a lot of fields. I want to create a table-stream join relation between them and output it to another topic in the following format:

{
   id, // the id used to join them
   x_name : X,
   y_name: Y
}

With other words, I want to join these two with each source nested. I am able to join them in a normal way, however all fields are flattened out. Can this be achieved with KsqlDB? I've tried to find a good way of doing this without success.

EDIT:

Adding more information and example. Say I have two topics with this type of data.

product_supply

{
  "product_id": 1,
  "name": "name",
  "stock": 11
  "price": "141",
  "storage_ids": [1, 2, 3]
}

product_information

{
  "product_id": 1,
  "description": "151",
  "manufacturer": "ABC"
  "Vendor_id": "5"
}

I'd like to use KsqlDB to join these tables in a unflattened manner and publish to a topic, like this:

{
  "product_id": 1,
  "product_information": {
      "product_id": 1,
      "description": "151",
      "manufacturer": "ABC"
      "Vendor_id": "5"
  }
  "product_supply": {
      "product_id": 1,
      "name": "name",
      "stock": 11
      "price": "141",
      "storage_ids": [1, 2, 3]
  }
}

I've added schema for each topic and wish if possible, to use the schemas without having to explicitly define each field in ksql.

1
Welcome to StackOverflow! Could you edit your question to include an example of the two message types that you're trying to join please?Robin Moffatt
Hi Robin, thanks! I've added an example to demonstrate my intention. I hope it is clear.Walf F

1 Answers

0
votes

There's a good guide here on working with structured data in ksqlDB. Based on this I was able to get this to work:

  • Create sample data

    CREATE STREAM PRODUCT_SUPPLY (PRODUCT_ID INT, NAME VARCHAR, STOCK INT, PRICE INT, STORAGE_IDS ARRAY<INT>) WITH (KAFKA_TOPIC='PRODUCT_SUPPLY', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO');
    CREATE TABLE PRODUCT_INFORMATION (PRODUCT_ID INT PRIMARY KEY, DESCRIPTION VARCHAR, MANUFACTURER VARCHAR, VENDOR_ID INT) WITH (KAFKA_TOPIC='PRODUCT_INFO', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO');
    INSERT INTO PRODUCT_SUPPLY VALUES(1,'NAME',11,141,ARRAY[1,2,3]);
    INSERT INTO PRODUCT_INFORMATION values (1,'151','abc',5);
    
  • Query the data

    SET 'auto.offset.reset' = 'earliest';
    
    SELECT PS.PRODUCT_ID AS PRODUCT_ID,
          STRUCT(NAME        := PS.NAME,
                  STOCK       := PS.STOCK,
                  PRICE       := PS.PRICE,
                  STORAGE_IDS := PS.STORAGE_IDS) AS PRODUCT_SUPPLY,
          STRUCT(DESCRIPTION  := PI.DESCRIPTION,
                  MANUFACTURER := PI.MANUFACTURER,
                  VENDOR_ID    := PI.VENDOR_ID) AS PRODUCT_INFORMATION
      FROM PRODUCT_SUPPLY PS
          LEFT JOIN PRODUCT_INFORMATION PI
          ON PS.PRODUCT_ID=PI.PRODUCT_ID
    EMIT CHANGES LIMIT 1;
    
    +-------------------------+-------------------------+-------------------------+
    |PRODUCT_ID               |PRODUCT_SUPPLY           |PRODUCT_INFORMATION      |
    +-------------------------+-------------------------+-------------------------+
    |1                        |{NAME=NAME, STOCK=11, PRI|{DESCRIPTION=151, MANUFAC|
    |                         |CE=141, STORAGE_IDS=[1, 2|TURER=abc, VENDOR_ID=5}  |
    |                         |, 3]}                    |                         |
    Limit Reached
    Query terminated