I use this SQL to create a session_id for a dataset. If a user is inactive for more than 30 minutes (30*60 seconds), then a new session_id is assigned I am new to Spark SQL and trying to replicate the same procedure using Spark SQL Context. But I'm encountering some errors.
session_id follows the naming convention:
userid_1,
userid_2,
userid_3,...
SQL (date is in seconds):
CREATE TABLE tablename_with_session_id AS
SELECT * , userid || '_' || SUM(new_session) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS session_id
FROM
(SELECT *,
CASE
WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60)
THEN 1
WHEN row_number() over (partition by userid order by date) = 1
THEN 1
ELSE 0
END as new_session
FROM
tablename
)
order by date;
I tried using the same SQL in Spark-Scala with:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val tableSessionID = sqlContext.sql("SELECT * , CONCAT(userid,'_',SUM(new_session)) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS new_session_id FROM
(SELECT *, CASE WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) THEN 1 WHEN row_number() over (partition by userid order by date) = 1 THEN 1 ELSE 0 END as new_session FROM clickstream) order by date")
Some Error which suggested to wrap Spark SQL expression ..sum(new_session).. within window function.
I tried to using multiple data frames:
val temp1 = sqlContext.sql("SELECT *, CASE WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) THEN 1 WHEN row_number() over (partition by userid order by date) = 1 THEN 1 ELSE 0 END as new_session FROM clickstream")
temp1.registerTempTable("clickstream_temp1")
val temp2 = sqlContext.sql("SELECT * , SUM(new_session) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS s_id FROM clickstream_temp1")
temp2.registerTempTable("clickstream_temp2")
val temp3 = sqlContext.sql("SELECT * , CONCAT(userid,'_',s_id) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS new_session_id FROM clickstream_temp2")
It returns an error only on the above statement. 'val temp3 = ...' That CONCAT(userid,'_',s_id) cannot be used within window function.
What's the workaround? Is there an alternative?
Thanks
CONCAT(userid, '_', SUM(new_session) OVER (PARTITION BY ...))
– zero323