1
votes

I use this SQL to create a session_id for a dataset. If a user is inactive for more than 30 minutes (30*60 seconds), then a new session_id is assigned I am new to Spark SQL and trying to replicate the same procedure using Spark SQL Context. But I'm encountering some errors.

session_id follows the naming convention:
userid_1,
userid_2,
userid_3,...

SQL (date is in seconds):

CREATE TABLE tablename_with_session_id AS 
    SELECT * , userid || '_' || SUM(new_session) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS session_id 
        FROM 
                (SELECT *, 
                    CASE 
                    WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) 
                    THEN 1 
                    WHEN row_number() over (partition by userid order by date) = 1 
                    THEN 1 
                    ELSE 0 
                    END as new_session 
                    FROM 
                        tablename 
                ) 
    order by date;

I tried using the same SQL in Spark-Scala with:

    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

    val tableSessionID = sqlContext.sql("SELECT * , CONCAT(userid,'_',SUM(new_session)) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS new_session_id FROM
(SELECT *, CASE WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) THEN 1 WHEN row_number() over (partition by userid order by date) = 1 THEN 1 ELSE 0 END as new_session FROM clickstream) order by date") 

Some Error which suggested to wrap Spark SQL expression ..sum(new_session).. within window function.

I tried to using multiple data frames:

val temp1 = sqlContext.sql("SELECT *, CASE WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) THEN 1 WHEN row_number() over (partition by userid order by date) = 1 THEN 1 ELSE 0 END as new_session FROM clickstream")

temp1.registerTempTable("clickstream_temp1")

val temp2 = sqlContext.sql("SELECT * , SUM(new_session) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS s_id FROM clickstream_temp1")

temp2.registerTempTable("clickstream_temp2")

val temp3 = sqlContext.sql("SELECT * , CONCAT(userid,'_',s_id) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS new_session_id FROM clickstream_temp2")

It returns an error only on the above statement. 'val temp3 = ...' That CONCAT(userid,'_',s_id) cannot be used within window function.

What's the workaround? Is there an alternative?

Thanks

1
CONCAT(userid, '_', SUM(new_session) OVER (PARTITION BY ...))zero323

1 Answers

2
votes

To use concat with spark window function you need to use user defined aggregate function(UDAF). You can't directly use concat function with window function.

//Extend UserDefinedAggregateFunction to write custom aggregate function
//You can also specify any constructor arguments. For instance you can have
//CustomConcat(arg1: Int, arg2: String)
class CustomConcat() extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction {
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.Row
// Input Data Type Schema
def inputSchema: StructType = StructType(Array(StructField("description", StringType)))
// Intermediate Schema
def bufferSchema = StructType(Array(StructField("groupConcat", StringType)))
// Returned Data Type.
def dataType: DataType = StringType
// Self-explaining
def deterministic = true
// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {buffer(0) = " ".toString}
// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = { buffer(0) = buffer.getString(0) + input.getString(0) }
// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { buffer1(0) = buffer1.getString(0) + buffer2.getString(0) }
// Called after all the entries are exhausted.
def evaluate(buffer: Row) = {buffer.getString(0)}
}
val newdescription = new CustomConcat
val newdesc1=newdescription($"description").over(windowspec)

You can use newdesc1 as an aggregate function for concatenation in window functions. For more information you can have a look at : databricks udaf I hope this will answer your question.