I am grouping events coming from a kafka topic by one of its properties and over time using the KSQL Windowed Aggregation, specifically the Session Window.
I have been able to create a stream of "session start signals" as described in this answer.
-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT)
WITH (kafka_topic='data', value_format='json', partitions=2);
-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
FROM DATA
WINDOW SESSION (5 SECONDS)
GROUP BY USER_ID;
-- Create a stream over the existing `SESSIONS` topic.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS
SELECT * FROM SESSION_STREAM
WHERE WINDOWSTART = WINDOWEND;
Would be possible to create a stream of "session end signals" every time a the Windowed Aggregation ends?