2
votes

I have a stream of data in topic that should be treated as ksql table (only last value of given key matters) and this data is about updates of some data's specific fields in other topic. Is there any way in KSQLDB to process stream that update values in other stream/table/topic? Target topic has entities with let's say 20 fields, but my stream that contains update has update of 3 fields, so I want to update only these 3 fields and other 17 fields should remain the same in target topic (treated as table).

1

1 Answers

1
votes

You can solve your problem using a JOIN STATEMENT with a little adjustment, follow the sample, will create a table with 5 fields, but only will be necessary to update the fields skill and level from another table.

1.Create the table from the source topic:

CREATE TABLE TBL_EMPLOYEE( `employee_id` VARCHAR, `name` varchar, `lastName` varchar, `age` INT, `skill` VARCHAR, `level` VARCHAR ) WITH ( KAFKA_TOPIC = 'employee-topic-input', PARTITIONS = 3, VALUE_FORMAT = 'JSON', KEY = '`employee_id`');

2.Create the table to handle the desired updates ( It can be stream or table, resulting from another query)

CREATE TABLE TBL_EMPLOYEE_DESIRED_UPDATES (`employee_id` VARCHAR, `skill` VARCHAR, `level` VARCHAR) WITH( KAFKA_TOPIC = 'employee-desired-updates-topic', PARTITIONS = 3, VALUE_FORMAT ='JSON', KEY = '`employee_id`');

3.Create the final table to update the required fields, the left join allows all the elements on the first table. if there is not any update on the second table, the skill and level fields will be the same.

SET 'auto.offset.reset' = 'earliest';
CREATE TABLE TBL_EMPLOYEE_FINAL AS 
    SELECT 
        EMP.`employee_id` AS `employee_id`, 
        EMP.`name` AS `name`, 
        EMP.`lastName` AS `lastName`, 
        IFNULL(UPD.`skill`, EMP.`skill`) as `skill`, 
        IFNULL(UPD.`level`, EMP.`level`) as `level` 
    FROM TBL_EMPLOYEE AS EMP 
    LEFT JOIN TBL_EMPLOYEE_DESIRED_UPDATES  UPD ON EMP.ROWKEY = UPD.ROWKEY EMIT CHANGES;

Example:

INSERT INTO TBL_EMPLOYEE (`employee_id`, `name`, `lastName`, `age`, `skill`, `level`) VALUES ('117', 'John', 'Constantine', 30, 'java', 'jr');
INSERT INTO TBL_EMPLOYEE (`employee_id`, `name`, `lastName`, `age`, `skill`, `level`) VALUES ('118', 'Anthony', 'Stark', 40, 'AWS', 'architect');
INSERT INTO TBL_EMPLOYEE (`employee_id`, `name`, `lastName`, `age`, `skill`, `level`) VALUES ('119', 'Clark', 'Kent', 35, 'python', 'senior');
ksql> SELECT * FROM TBL_EMPLOYEE_FINAL EMIT CHANGES;
+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+
|ROWTIME                        |ROWKEY                         |employee_id                    |name                           |lastName                       |skill                          |level                          |
+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+
|1611440363833                  |119                            |119                            |Clark                          |Kent                           |python                         |senior                         |
|1611440361284                  |117                            |117                            |John                           |Constantine                    |java                           |jr                             |
|1611440361408                  |118                            |118                            |Anthony                        |Stark                          |AWS                            |architect                      |

The second step is to send an update

INSERT INTO TBL_EMPLOYEE_DESIRED_UPDATES  (`employee_id`, `skill`, `level` ) VALUES ('118', 'mongo', 'senior');

The result

ksql> SELECT  * from TBL_EMPLOYEE_FINAL EMIT CHANGES;
+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+
|ROWTIME                        |ROWKEY                         |employee_id                    |name                           |lastName                       |skill                          |level                          |
+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+
|1611440363833                  |119                            |119                            |Clark                          |Kent                           |python                         |senior                         |
|1611440361284                  |117                            |117                            |John                           |Constantine                    |java                           |jr                             |
|1611440361408                  |118                            |118                            |Anthony                        |Stark                          |AWS                            |architect                      |
|1611440585726                  |118                            |118                            |Anthony                        |Stark                          |mongo                          |senior                         |

You have to consider the latest element in the table as the new one with the two modifications. The other one is part of the changelog of the table. The records are immutables.