Taken from Apache Spark, Dataset.scala (https://github.com/apache/spark/blob/0c47e274ab8c286498fa002e2c92febcb53905c6/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala)
Line 2132:
/**
* Returns the number of rows in the [[Dataset]].
* @group action
* @since 1.6.0
*/
def count(): Long = withCallback("count", groupBy().count()) { df =>
df.collect(needCallback = false).head.getLong(0)
}
Line 2393:
/**
* Wrap a Dataset action to track the QueryExecution and time cost, then report to the
* user-registered callback functions.
*/
private def withCallback[U](name: String, df: DataFrame)(action: DataFrame => U) = {
try {
df.queryExecution.executedPlan.foreach { plan => plan.resetMetrics()
}
val start = System.nanoTime()
val result = action(df)
val end = System.nanoTime()
sqlContext.listenerManager.onSuccess(name, df.queryExecution, end - start)
result
} catch {
case e: Exception =>
sqlContext.listenerManager.onFailure(name, df.queryExecution, e)
throw e
}
}
What is going on here? I don't understand how count() is somehow both equal to withCallback and has a body; somehow it's being called on the dataframe returned by withCallback, but I don't understand the syntax.