I have a kind of event stream that looks like this:
Time UserId SessionId EventType EventData
1 2 A Load /a ...
2 1 B Impressn X ...
3 2 A Impressn Y ...
4 1 B Load /b ...
5 2 A Load /info ...
6 1 B Load /about ...
7 2 A Impressn Z ...
In practice users can have many sessions over larger time windows and there's also a click event type but keeping this simple here, I'm trying to see the (page views) loads that lead to next load and also what impressions happened in aggregate.
So, without SQL I've loaded this, grouped by user, sequenced by time, and for each session marked each row with previous load info (if any). With a
val outDS = logDataset.groupByKey(_.UserId)
.flatMapGroups((_, iter) => gather(iter))
where gather sorts the iter by time (might be redundant as the input is sorted by time), then iterates over the sequence, sets lastLoadData to null at each new session, adds lastLoadData to each row and updates lastLoadData to the data of this row if the row is a Load type. Producing something like:
Time UserId SessionId EventType EventData LastLoadData
1 2 A Load / ... null
2 1 B Impressn X ... null
3 2 A Impressn Y ... / ...
4 1 B Load / ... null
5 2 A Load /info ... / ...
6 1 B Load /about ... / ...
7 2 A Impressn Z ... /info ...
Allowing me to then aggregate what (page views) loads lead to what other loads, or on each (page) load what are the top 5 Impressn events.
outDS.createOrReplaceTempView(tempTable)
val journeyPageViews = sparkSession.sql(
s"""SELECT lastLoadData, EventData,
| count(distinct UserId) as users,
| count(distinct SessionId) as sessions
|FROM ${tempTable}
|WHERE EventType='Load'
|GROUP BY lastLoadData, EventData""".stripMargin)
But, I get the feeling that the adding of a lastLoadData column could be done using Spark SQL windows too, however I'm hung up on two parts of that:
- If I make a window over UserId+SessionId ordered by time how do have it apply to all events but look at the previous load event? (EG Impressn would get a new column lastLoadData assigned to this window's previous EventData)
- If I somehow make a new window per session's Load event (also not sure how), The Load event in the start of the window (presumably "first") should get the lastLoadData of the previous window's "first" so that's probably not the right way to do it either.