0
votes

I have a topic called customers and I have created a stream for it

CREATE STREAM customers_stream (customerId INT, isActive BOOLEAN)
  WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='json');

My producer for customers topic is generating a Integer key and a json value. But when I see the row key is being set to a some binary value

ksql> print 'customers';
Format:JSON
{"ROWTIME":1570305904984,"ROWKEY":"\u0000\u0000\u0003�","customerId":1001,"isActive":true}
{"ROWTIME":1570307584257,"ROWKEY":"\u0000\u0000\u0003�","customerId":1002,"isActive":true}

Now if i create a table it results in a single row (maybe because row key is the same??)

CREATE TABLE customers (customerId INT, isActive BOOLEAN)
  WITH (KAFKA_TOPIC='customers', KEY='customerId',VALUE_FORMAT='json');

After searching the web I bumped into this article https://www.confluent.io/stream-processing-cookbook/ksql-recipes/setting-kafka-message-key and created a new stream by repartitioning on the key

CREATE STREAM customers_stream2 AS \
 SELECT * FROM customers_stream \
 PARTITION BY customerId;

So how do I create a table which has the latest values of customers data?

creating a table from stream is resulting in a error

CREATE TABLE customers_2_table_active AS
  SELECT CUSTOMERID,ISACTIVE
  FROM customers_stream2;

Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.

I need the latest value of the various rows so that another microservice can query the new table.

Thank you in advance

1

1 Answers

1
votes

Rekeying seems to be the right approach, however, you cannot convert a STREAM into a TABLE directly.

Note, that your rekeyed stream customers_stream2 is written into a corresponding topic. Hence, you should be able to crate a new TABLE from the stream's topic to get the latest value per key.