1
votes

We are performing the POC with KSQLDB and some doubts :-

I have a Kafka topic named USERPROFILE which have around 100 million unique records and 10 days retention policy. This Kafka topic continues to receive INSERT/UPDATE type of events in real-time from its underlying RDBMS table.

Following is the simple structure of the record being received in this kafka topic :-

{"userid":1001,"firstname":"Hemant","lastname":"Garg","countrycode":"IND","rating":3.7} 

1.) We have opened a Kafka Stream on this aforesaid TOPIC :-

create STREAM userprofile_stream (userid INT, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'USERPROFILE')>;

2.) Because, there can be updates for given userId and we want only unique records (for each userId), we have also opened another Kafka Table on this aforesaid TOPIC :-

ksql> create TABLE userprofile_table(userid VARCHAR PRIMARY KEY, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) WITH (KAFKA_TOPIC = 'USERPROFILE', VALUE_FORMAT = 'DELIMITED');

Questions are :-

  • Does it takes extra space on the Disk to open the KTable ? For example, Kafka topic have 100 million records, would the same records be also present in the KTable OR Is it just some virtual view on the underlying kafka topic ?

  • Same question for the stream that we have opened. Does it takes extra space on the Disk (of the Brokers servers) to open the KStream ? For example, Kafka topic have 100 million records, would the same records be also present in the KStream OR Is it just some virtual view on the underlying kafka topic ?

  • Say, we received record with id as 1001 on 1st May, then on 11th May, that record would no more be available on Kafka topic, But Whether that record would still be present on kstream / Ktable ? Are there some retention policy for KStream / KTable as well like we have for Topic as such ?

Answers shall be highly appreciated.

-- Best aditya

1

1 Answers

0
votes

ksqlDB server is powered by Kafka Streams. As a result, when you create a stream or a table, the server will create a KStream or KTable respectively.

On top of that KStream and KTables are backed up by topics in Kafka. As a result, creating streams and tables on a ksqlDB server will create actual topics on your Kafka cluster.

Having said that, the streams and tables from ksqlDB are materialized on need and pretty optimized, those two articles from Confluent gives more insights on the internal behaviour with good visual help:

You can even take a look by yourself at the created data. For the sake of example, I created:

  • A MESSAGES_STREAM stream from the original topic
  • A MATERIALIZED_MESSAGES_STTREAM from the stream above
  • A MESSAGES table from the first stream

Here are the creation commands for reference:

ksql> CREATE STREAM messages_stream (user_id BIGINT KEY, message VARCHAR) 
  WITH (KAFKA_TOPIC = 'hello_topic_json', VALUE_FORMAT='JSON');

ksql> CREATE STREAM materialized_messages_stream AS 
  SELECT user_id, UCASE(message) 
  FROM messages_stream 
EMIT CHANGES;

ksql> CREATE TABLE messages AS
  SELECT user_id, count(*) as msg_count
  FROM messages_stream
  GROUP BY user_id
EMIT CHANGES;

By looking at the details in ksqlDB, we can see that the first stream is using the original topic as source:

ksql> describe extended MESSAGES_STREAM;

Name                 : MESSAGES_STREAM
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : hello_topic_json (partitions: 1, replication: 1)
-- […]

ksql> describe extended MATERIALIZED_MESSAGES_STREAM;

Name                 : MATERIALIZED_MESSAGES_STREAM
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : MATERIALIZED_MESSAGES_STREAM (partitions: 1, replication: 1)
-- […]

ksql> describe extended MESSAGES;

Name                 : MESSAGES
Type                 : TABLE
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : MESSAGES (partitions: 1, replication: 1)
-- […]

And looking at the declared topic on our cluster, we can see the second stream, the table and its changelog topic created under the hood:

$ ./kafka-topics.sh --bootstrap-server localhost:29092 --list
MATERIALIZED_MESSAGES_STREAM
MESSAGES
__consumer_offsets
__transaction_state
_confluent-ksql-ksql_docker_command_topic
_confluent-ksql-ksql_dockerquery_CTAS_MESSAGES_1-Aggregate-Aggregate-Materialize-changelog
_schemas
hello_topic_json

You can also see that the retention policies are different between a stream and a table. The former will delete old records, while the latter will compact the data:

$ ./kafka-topics.sh --bootstrap-server localhost:29092 --topic MATERIALIZED_MESSAGES_STREAM --describe
Topic: MATERIALIZED_MESSAGES_STREAM PartitionCount: 1   ReplicationFactor: 1    Configs: cleanup.policy=delete

$ ./kafka-topics.sh --bootstrap-server localhost:29092 --topic MESSAGES --describe
Topic: MESSAGES PartitionCount: 1   ReplicationFactor: 1    Configs: cleanup.policy=compact

TL;DR, to go back on your questions:

  1. Yes it takes space to open a KTable, but that most likely won’t be a 1 to 1 mapping on used bytes.
  2. In your case, the stream will most likely use the topic as a reference and won’t take more space as you don’t have any data transformation happening.
  3. The retention policy on a table is compaction, so your entry will still be available on the table. On your stream, however, the data will be as available as on the reference topic.