2
votes

Is there an example somewhere or can someone explain how to using Kinesis Analytics to construct real time sessions. (ie sessionization)

It is mentioned that this possible here: https://aws.amazon.com/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/ in the discussion of custom windows but does not give an example.

Typically this is done in SQL using the LAG function so you can compute the time difference between consecutive rows. This post: https://blog.modeanalytics.com/finding-user-sessions-sql/ describes how to do it with conventional (non-streaming) SQL. However, I don't see support for the LAG function in Kinesis Analytics.

In particular I would love two examples. Assume that both take as input a stream consisting of a user_id and a timestamp. Define a session a sequence of events from the same user separated by less than 5 minutes

1) The first outputs a stream that has the additional columns event_count session_start_timestamp. Every time an event comes in this should output an event with these two additional columns.

2) The second example would be a stream that outputs a single event per session once the session has ended (ie 5 minutes have past with no data from a user). This event would have userId, start_timestamp, end_timestamp, and event_count

Is this possible with Kinesis Analytics?

Here is an example of doing this with Apache Spark: https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/Applications/01%20Sessionization.html

But I would love to do this with one (or two) Kinesis Analytics streams.

2

2 Answers

0
votes

You can do this using Drools by creating the following logic:

Types:

package com.test;

import java.util.List;

declare EventA
    @role( event )
    userId:String;
    seen:boolean;
end

declare SessionStart
    userId: String;
    timestamp: long;
    events: List;
end

declare SessionEnd
    userId: String;
    timestamp: long;
    numOfEvents: int;
end

declare SessionNotification
    userId: String;
    currentNumOfEvents: int;
end

Rules:

package com.test;

import java.util.List;
import java.util.ArrayList;

rule "Start session"
when
    // for any EventA
    $a : EventA() from entry-point events
    // check session is not started for this userId
    not (exists(SessionStart(userId == $a.userId)))
then
    modify($a){setSeen(true);}
    List events = new ArrayList();
    events.add($a);
    insert(new SessionStart($a.getUserId(), System.currentTimeMillis(), events));
end

rule "join session"
when
    // for every new EventA
    $a : EventA(seen == false) from entry-point events
    // get event's session
    $session: SessionStart(userId == $a.userId)
then
    $session.getEvents().add($a);
    insertLogical(new SessionNotification($a.getUserId(), $session.getEvents().size()));
    modify($a) {setSeen(true);}

end

rule "End session"
// if session timed out, clean up first
salience 5
when
    // for any EventA
    $a : EventA() from entry-point events
    // check there is no following EventA with same userId within 30 seconds
    not (exists(EventA(this != $a, userId == $a.userId, this after[0, 30s] $a)
        from entry-point events))
    // get event's session
    $session: SessionStart(userId == $a.userId)
then
    insertLogical(new SessionEnd($a.getUserId(), System.currentTimeMillis(),
        $session.getEvents().size()));

    // cleanup
    for (Object $x : $session.getEvents())
        delete($x);
    delete($session);
end

You can author Drools Kinesis Analytics with this service

0
votes

There is support for LAG now on Kinesis Analytics. You can see it on the documentation page http://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-lag.html. I have actually used it for a similar use case as the one you describe.