0
votes

I'm trying to create a ksql table to keep the latest version of an entity. I would like to know what's the recommended approach for this.

So far, I've tried the following: I loaded invoices into kafka through debezium and then I created the following stream to being able to use them from ksql:

ksql> create stream invoice_stream with (kafka_topic='dbserver1.invoices.invoice', value_format='AVRO');

Debezium adds data about the state of the database table row before and after what makes it a bit difficult to use, so I created another stream on top to get only the data I am interested in:

create stream invoice 
      with (kafka_topic='invoice', value_format='AVRO')
      as
      select i.before->id as before_id,
             i.after->id as after_id,
             ifnull(i.transaction->id, 'NA') as transaction_id,
             i.after->description as description,
             i.after->invoice_date as invoice_date,
             i.after->status as status
      from invoice_stream i;

So far, so good, I can query the stream with a push query and see what's expected:

ksql> select * from invoice emit changes;
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|ROWTIME                 |ROWKEY                  |BEFORE_ID               |AFTER_ID                |TRANSACTION_ID          |DESCRIPTION             |INVOICE_DATE            |STATUS                  |
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|1583961059498           |                        |null                    |1                       |NA                      |Invoice A               |18201                   |N                       |
|1583961059499           |                        |null                    |2                       |NA                      |Invoice B               |18205                   |N                       |
|1583961059499           |                        |null                    |3                       |NA                      |Invoice C               |18210                   |N                       |
|1583961059499           |                        |null                    |4                       |NA                      |Invoice D               |18215                   |N                       |
|1583961263233           |                        |null                    |5                       |623                     |test line added later   |18263                   |N                       |
|1584007291546           |                        |5                       |5                       |625                     |test line added later   |18263                   |P                       |

As there is no key, I created another stream on top where I specify the partition:

ksql> create stream invoice_rekeyed as select * from invoice partition by after_id;
ksql> describe invoice_rekeyed;

Name                 : INVOICE_REKEYED
 Field          | Type                      
--------------------------------------------
 ROWTIME        | BIGINT           (system) 
 ROWKEY         | VARCHAR(STRING)  (system) 
 BEFORE_ID      | INTEGER                   
 AFTER_ID       | INTEGER          (key)    
 TRANSACTION_ID | VARCHAR(STRING)           
 DESCRIPTION    | VARCHAR(STRING)           
 INVOICE_DATE   | INTEGER                   
 STATUS         | VARCHAR(STRING)           
--------------------------------------------

Finally I created a table like this:

create table invoice_table(before_id int, after_id int, transaction_id string, description string, invoice_date int, status string) 
with (kafka_topic='INVOICE_REKEYED', key='after_id', value_format='AVRO');

So at this point I would expect to being able to query the table by rowkey, however I get the following message:

ksql> select * from invoice_table where rowkey = 5;
Table 'INVOICE_TABLE' is not materialized. Refer to https://cnfl.io/queries for info on query types. If you intended to issue a push query, resubmit with the EMIT CHANGES clause
 KSQL currently only supports pull queries on materialized aggregate tables. i.e. those created by a 'CREATE TABLE AS SELECT <fields>, <aggregate_functions> FROM <sources> GROUP BY <key>' style statement.
Query syntax in KSQL has changed. There are now two broad categories of queries:
- Pull queries: query the current state of the system, return a result, and terminate. 
- Push queries: query the state of the system in motion and continue to output results until they meet a LIMIT condition or are terminated by the user.

'EMIT CHANGES' is used to to indicate a query is a push query. To convert a pull query into a push query, which was the default behavior in older versions of KSQL, add `EMIT CHANGES` to the end of the statement 
before any LIMIT clause.

For example, the following are pull queries:
    'SELECT * FROM X WHERE ROWKEY=Y;' (non-windowed table)
    'SELECT * FROM X WHERE ROWKEY=Y AND WINDOWSTART>=Z;' (windowed table)

The following is a push query:
    'SELECT * FROM X EMIT CHANGES;'

Note: Persistent queries, e.g. `CREATE TABLE AS ...`, have an implicit `EMIT CHANGES`, but we recommend adding `EMIT CHANGES` to these statements.

Also, if I query it as a push query, I see more than one row for the key 5:

ksql> select * from invoice_table emit changes;
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|ROWTIME                 |ROWKEY                  |BEFORE_ID               |AFTER_ID                |TRANSACTION_ID          |DESCRIPTION             |INVOICE_DATE            |STATUS                  |
+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|1583961059498           |1                       |null                    |1                       |NA                      |Invoice A               |18201                   |N                       |
|1583961059499           |2                       |null                    |2                       |NA                      |Invoice B               |18205                   |N                       |
|1583961059499           |3                       |null                    |3                       |NA                      |Invoice C               |18210                   |N                       |
|1583961059499           |4                       |null                    |4                       |NA                      |Invoice D               |18215                   |N                       |
|1583961263233           |5                       |null                    |5                       |623                     |test line added later   |18263                   |N                       |
|1584007291546           |5                       |5                       |5                       |625                     |test line added later   |18263                   |P                       |

I would like to understand why the table has not been materialized, as according to the previous message it seems to be what's blocking me to start querying the table by rowkey.

Thanks in advance

UPDATE

Trying the example pointed by Robin, I actually got the expected behavior; in the example, running the query while updating the original db row makes appear the changes:

ksql> select * from customers where id = 5 emit changes;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME          |ROWKEY           |ID               |FIRST_NAME       |LAST_NAME        |EMAIL            |GENDER           |CLUB_STATUS      |COMMENTS         |CREATE_TS        |UPDATE_TS        |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1584102664415    |5                |5                |Hansiain         |Coda             |[email protected]|Male             |platinum         |Centralized full-|2020-03-13T12:29:|2020-03-13T12:29:|
|                 |                 |                 |                 |                 |                 |                 |                 |range approach   |53Z              |53Z              |
|1584102741712    |5                |5                |Rodrigo          |Coda             |[email protected]|Male             |platinum         |Centralized full-|2020-03-13T12:29:|2020-03-13T12:32:|
|                 |                 |                 |                 |                 |                 |                 |                 |range approach   |53Z              |21Z              |

However, if the query is terminated and run again, only the latest version is available:

ksql> select * from customers where id = 5 emit changes;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME          |ROWKEY           |ID               |FIRST_NAME       |LAST_NAME        |EMAIL            |GENDER           |CLUB_STATUS      |COMMENTS         |CREATE_TS        |UPDATE_TS        |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1584102741712    |5                |5                |Rodrigo          |Coda             |[email protected]|Male             |platinum         |Centralized full-|2020-03-13T12:29:|2020-03-13T12:32:|
|                 |                 |                 |                 |                 |                 |                 |                 |range approach   |53Z              |21Z              |

However, doing in principal the same on my example, always return all version of the row:

ksql> print 'dbserver1.invoices.invoice' from beginning limit 50;
Format:AVRO
3/13/20 12:23:09 PM UTC, 1, {"id": 1, "description": "Invoice A", "invoice_date": 18201, "status": "N", "__op": "r", "__ts_ms": 1584102188934, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 2, {"id": 2, "description": "Invoice B", "invoice_date": 18205, "status": "N", "__op": "r", "__ts_ms": 1584102188936, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 3, {"id": 3, "description": "Invoice C", "invoice_date": 18210, "status": "N", "__op": "r", "__ts_ms": 1584102188938, "__transaction_id": null}
3/13/20 12:23:09 PM UTC, 4, {"id": 4, "description": "Invoice D", "invoice_date": 18215, "status": "N", "__op": "r", "__ts_ms": 1584102188938, "__transaction_id": null}
^CTopic printing ceased
ksql> create table invoice_table with (kafka_topic='dbserver1.invoices.invoice', value_format='AVRO');

 Message       
---------------
 Table created 
---------------

ksql> select * from invoice_table where id = 4 emit changes;
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|ROWTIME              |ROWKEY               |ID                   |DESCRIPTION          |INVOICE_DATE         |STATUS               |__OP                 |__TS_MS              |__TRANSACTION_ID     |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|1584102189675        |4                    |4                    |Invoice D            |18215                |N                    |r                    |1584102188938        |null                 |
|1584102365378        |4                    |4                    |Invoice D UPDATED    |18215                |N                    |u                    |1584102365128        |623                  |
^CQuery terminated
ksql> select * from invoice_table where id = 4 emit changes;
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|ROWTIME              |ROWKEY               |ID                   |DESCRIPTION          |INVOICE_DATE         |STATUS               |__OP                 |__TS_MS              |__TRANSACTION_ID     |
+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|1584102189675        |4                    |4                    |Invoice D            |18215                |N                    |r                    |1584102188938        |null                 |
|1584102365378        |4                    |4                    |Invoice D UPDATED    |18215                |N                    |u                    |1584102365128        |623                  |

Any idea of any configuration that could make this difference on behavior?

1

1 Answers

1
votes

A few things to unpick and help you with here, but the top-level answer is pull queries are not supported on non-materialized tables, and you've not materialized it and nor can you until #3985 is delivered.

As you've seen you can run a push query against a table. The multiple outputs that you see are down to the state changing. If you cancel your push query and re-run it you'll see just a single state for each key.


Debezium adds data about the state of the database table row before and after what makes it a bit difficult to use

Check out the io.debezium.transforms.ExtractNewRecordState Single Message Transform which will flatten the payload and put just the current state in your message

'transforms'= 'unwrap',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',

As there is no key, I created another stream on top where I specify the partition:

This is one approach, but a better one is to set the key as part of the Kafka Connect ingest

'transforms'= 'extractkey',
'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractkey.field'= 'id',
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',

For an example of these in action, check out this recent QCon workshop.