Hi i am trying out KSQL and found that most of the documentation provides example of direct joins with kstream or ktable. Lets say i have 2 streams already created by team X and i can't change it or recreate new streams and also want to use existing stream. Here by default rowtime is consider for timestamp column while joining. Is there any way to specify other than rowtime column as timestamp column in existing streams and consider its time windows for joining in KSQL.
0
votes
1 Answers
2
votes
You can specify any valid column using the WITH (TIMESTAMP='column_name') syntax. For example:
CREATE STREAM ORDERS (ORDER_ID INT,
CUSTOMER_ID INT,
ORDER_TS VARCHAR,
ORDER_TOTAL_USD DOUBLE,
MAKE VARCHAR)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON',
TIMESTAMP='ORDER_TS', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX');
This will use the ORDER_TS column as the timestamp in any time-related KSQL operations (i.e. windowed joins and aggregations). You can validate this by comparing the ROWTIME value to the ORDER_TS and observe that they are the same.
References:
- https://www.confluent.io/blog/data-wrangling-apache-kafka-ksql (section Managing message timestamps in KSQL)
- https://rmoff.net/2019/03/28/exploring-ksql-stream-stream-joins/