1
votes

I'm playing around with Confluent Community and a Postgres database and am running into the following issue.

The events flow well into kafka and the topics are created. I created a stream out of a topic and rekeyed it because the key was null.

Out of that new topic underlying the rekeyed stream, I created a table. The goal is to have a constantly up to date table of objects (here categories).

The thing is that the table never gets updated with the new data when I do a manual UPDATE in the database. The rows just keep being added like it's a stream. Of course, I did the select again because I know that the 'update' rows show up when we're still running the query.

ksql> select * from categories;
1568287458487 | 1 | 1 | Beverages | Soft drinks, coffees, teas, beers, and ales
1568287458487 | 2 | 2 | Condiments | Sweet and savory sauces, relishes, spreads, and seasonings
1568287458488 | 3 | 3 | Confections | Desserts, candies, and sweet breads
1568287458488 | 4 | 4 | Dairy Products | Cheeses
1568287458488 | 5 | 5 | Grains/Cereals | Breads, crackers, pasta, and cereal
1568287458488 | 6 | 6 | Meat/Poultry | Prepared meats
1568287458489 | 7 | 7 | Produce | Dried fruit and bean curd
1568287458489 | 8 | 8 | Seafood | Seaweed and fish
1568288647248 | 8 | 8 | Seafood2 | Seaweed and fish
1568290562250 | 1 | 1 | asdf | Soft drinks, coffees, teas, beers, and ales
1568296165250 | 8 | 8 | Seafood3 | Seaweed and fish
1568296704747 | 8 | 8 | Seafood4 | Seaweed and fish
^CQuery terminated
ksql> select * from categories;
1568287458487 | 1 | 1 | Beverages | Soft drinks, coffees, teas, beers, and ales
1568287458487 | 2 | 2 | Condiments | Sweet and savory sauces, relishes, spreads, and seasonings
1568287458488 | 3 | 3 | Confections | Desserts, candies, and sweet breads
1568287458488 | 4 | 4 | Dairy Products | Cheeses
1568287458488 | 5 | 5 | Grains/Cereals | Breads, crackers, pasta, and cereal
1568287458488 | 6 | 6 | Meat/Poultry | Prepared meats
1568287458489 | 7 | 7 | Produce | Dried fruit and bean curd
1568287458489 | 8 | 8 | Seafood | Seaweed and fish
1568288647248 | 8 | 8 | Seafood2 | Seaweed and fish
1568290562250 | 1 | 1 | asdf | Soft drinks, coffees, teas, beers, and ales
1568296165250 | 8 | 8 | Seafood3 | Seaweed and fish
1568296704747 | 8 | 8 | Seafood4 | Seaweed and fish
^CQuery terminated
ksql> 

Categories table in postgres:

CREATE TABLE categories (
    category_id smallint NOT NULL,
    category_name character varying(15) NOT NULL,
    description text
);

categories table in KSQL:

ksql> describe extended categories;

Name                 : CATEGORIES
Type                 : TABLE
Key field            : CATEGORY_ID_ST
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : categories_rk (partitions: 1, replication: 1)

 Field          | Type                      
--------------------------------------------
 ROWTIME        | BIGINT           (system) 
 ROWKEY         | VARCHAR(STRING)  (system) 
 CATEGORY_ID_ST | VARCHAR(STRING)           
 CATEGORY_NAME  | VARCHAR(STRING)           
 DESCRIPTION    | VARCHAR(STRING)           
 MESSAGETOPIC   | VARCHAR(STRING)           
 MESSAGESOURCE  | VARCHAR(STRING)           
--------------------------------------------

How is it possible that a table that is supposed to have a unique ROWKEY keeps adding more 'update' rows with the same ROWKEY?

I'm actually expecting the table to display an always up-to-date list of categories, as stated in https://www.youtube.com/watch?v=DPGn-j7yD68&list=PLa7VYi0yPIH2eX8q3mPpZAn3qCS1eDX8W&index=9:

"A TABLE is a materialized view of events with only the latest values for each key". But maybe I misunderstood that?

1
Welcome to StackOverflow! Can you share the DDL that you're running and details of how your're getting data from postgres into Kafka?Robin Moffatt
Postgres are flowing into kafka via debezium. Updated question with DDL.reneveyj
Can you share your KSQL DDL too pleaseRobin Moffatt
Thanks for your help Robin. I updated the question with additional data.reneveyj

1 Answers

1
votes

A table in KSQL is constantly being updated as new data arrives. The output topic that the table's rows are written to is known as a changelog: it is an immutable log of changes to the table. If a specific key is updated multiple times, then the output topic will contain multiple messages for the same key. Each new value replaces the last.

When you run a query such as:

select * from categories;

in the version of ksql you're using, you're not running traditional query, like you'd expect in a traditional RDBS. Such a query would give you the current set of rows in the table. In ksql the above query will stream all the updates to the rows as they're occuring. Hence, if the same key is updated multiple times; you'll see the same key output by the query multiple times.

In more recent versions of ksqlDB the above query would not be written:

select * from categories emit changes;

Inside ksql each key is only stored in the materialized table only once, and it's always the most recent version seen.