1
votes

I am grouping events coming from a kafka topic by one of its properties and over time using the KSQL Windowed Aggregation, specifically the Session Window.

I have been able to create a stream of "session start signals" as described in this answer.

-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT) 
    WITH (kafka_topic='data', value_format='json', partitions=2);

-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
  FROM DATA
WINDOW SESSION (5 SECONDS)
   GROUP BY USER_ID;

-- Create a stream over the existing `SESSIONS` topic.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT) 
   WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');

-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS 
    SELECT * FROM SESSION_STREAM 
    WHERE WINDOWSTART = WINDOWEND;

Would be possible to create a stream of "session end signals" every time a the Windowed Aggregation ends?

1

1 Answers

2
votes

I'm assuming by this you mean you want to emit an event/row when a session window hasn't seen any new messages that fit into the session for the 5 seconds you've configured for the window?

I don't think this is possible at present.

Because the source data can have records that are out-of-order, i.e. an event with a timestamp much earlier than rows already processed, a session window can not be 'closed' once the 5 SECONDS window has elapsed.

Existing sessions will, by default, be closed after 24 hours if no new data is received that should be included in the session. This can be controlled by setting a GRACE PERIOD in the window definition.

This closing of windows once the grace period has elapsing does not result in any row being output at present. However, KLIP 10 - Add Suppress to KSQL may give you want you once it is implemented