I have found that in Apache Spark SQL (version 2.2.0), when a user-defined aggregate function (UDAF) that is used over a window specification is supplied with multiple rows of identical input, the UDAF does (seemingly) not call the evaluate method correctly.
I have been able to reproduce this behavior both in Java and Scala, locally and on a cluster. The code below shows an example where rows are marked as false if they are within 1 second of the previous row.
class ExampleUDAF(val timeLimit: Long) extends UserDefinedAggregateFunction {
def deterministic: Boolean = true
def inputSchema: StructType = StructType(Array(StructField("unix_time", LongType)))
def dataType: DataType = BooleanType
def bufferSchema = StructType(Array(
StructField("previousKeepTime", LongType),
StructField("keepRow", BooleanType)
))
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = 0L
buffer(1) = false
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (buffer(0) == 0L) {
buffer(0) = input.getLong(0)
buffer(1) = true
} else {
val timeDiff = input.getLong(0) - buffer.getLong(0)
if (timeDiff < timeLimit) {
buffer(1) = false
} else {
buffer(0) = input.getLong(0)
buffer(1) = true
}
}
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {} // Not implemented
def evaluate(buffer: Row): Boolean = buffer.getBoolean(1)
}
val timeLimit = 1000 // 1 second
val udaf = new ExampleUDAF(timeLimit)
val window = Window
.orderBy(column("unix_time"))
.partitionBy(column("category"))
val df = spark.createDataFrame(Arrays.asList(
Row(1510000001000L, "a", true),
Row(1510000001000L, "a", false),
Row(1510000001000L, "a", false),
Row(1510000001000L, "a", false),
Row(1510000700000L, "a", true),
Row(1510000700000L, "a", false)
), new StructType().add("unix_time", LongType).add("category", StringType).add("expected_result", BooleanType))
df.withColumn("actual_result", udaf(column("unix_time")).over(window)).show
Below is the output of running the code above. The first row is expected to have a actual_result value of true, as there is no prior data. When the unix_time input is modified to have 1 millisecond between each record, the UDAF works as expected.
Adding print statements in the UDAF methods revealed that evaluate is only called once, at the end, and that buffer was correctly updated to true in the update method, but this is not what is returned after the completion of the UDAF.
+-------------+--------+---------------+-------------+
| unix_time|category|expected_result|actual_result|
+-------------+--------+---------------+-------------+
|1510000001000| a| true| false| // Should true as first element
|1510000001000| a| false| false|
|1510000001000| a| false| false|
|1510000001000| a| false| false|
|1510000700000| a| true| false| // Should be true as more than 1000 milliseconds between self and previous
|1510000700000| a| false| false|
+-------------+--------+---------------+-------------+
I am understanding Spark's UDAF behavior correctly when used over window specifications? If not, could anyone offer any insight in this area. If my understanding of UDAF behaviour over windows is correct, could this be a bug in Spark? Thank you.
row_number(ordered by unix_time), then use thisrow_numberas ordering column in your window-specification instead ofunix_time. It seems like the problem is the the order is not clearly defined - Raphael Roth