1
votes

Taken from Apache Spark, Dataset.scala (https://github.com/apache/spark/blob/0c47e274ab8c286498fa002e2c92febcb53905c6/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala)

Line 2132:

/**
  * Returns the number of rows in the [[Dataset]].
  * @group action
  * @since 1.6.0
  */
def count(): Long = withCallback("count", groupBy().count()) { df =>
  df.collect(needCallback = false).head.getLong(0)
}

Line 2393:

/**
  * Wrap a Dataset action to track the QueryExecution and time cost, then report to the
  * user-registered callback functions.
  */
private def withCallback[U](name: String, df: DataFrame)(action: DataFrame => U) = {
  try {
      df.queryExecution.executedPlan.foreach { plan => plan.resetMetrics()
    }
    val start = System.nanoTime()
    val result = action(df)
    val end = System.nanoTime()
    sqlContext.listenerManager.onSuccess(name, df.queryExecution, end - start)
    result
  } catch {
  case e: Exception =>
    sqlContext.listenerManager.onFailure(name, df.queryExecution, e)
    throw e
  }
}

What is going on here? I don't understand how count() is somehow both equal to withCallback and has a body; somehow it's being called on the dataframe returned by withCallback, but I don't understand the syntax.

1

1 Answers

4
votes

The count() method doesn't actually have its own body. What looks like the body of the count() is really a function literal, which defines the 'action' argument of withCallback. count() itself is, strictly speaking, just a call to the method withCallback(name, df)(action). (Methods can have multiple argument lists in Scala.) The value of withCallback is result, which is whatever the action function evaluates to.

However, the "confusion" you are experiencing is intentional. This idiom -- a method that has a terminal argument list whose type is either a function or a by-name value -- allows one to define what syntactically look like language extensions. We are accustomed to languages having special syntax like...

try {
  // your code here
}

In Scala, you can write your own function like...

// don't ask me why you would want to do this
def unreliably[T]( operation : =>T ) : Option[T] = {
   if (scala.math.random < 0.1) Some(operation) else None
}

...that users could call like

unreliably {
  // your code here
}

It looks just like new language syntax! To make it more like your motivating example, we could modify the definition to have to argument lists...

// don't ask me why you would want to do this
def unreliably[T]( probability : Double )( operation : =>T ) : Option[T] = {
   if (scala.math.random < probability) Some(operation) else None
}

Now, we could call the function as...

unreliably( probability = 0.9 ) {
  // your code here
}

...and there'd be a 90% chance that your code would get executed. The code defines an expression, not just some valueless statement, so you could also write

val result = unreliably( probability = 0.9 ) {
   "great day"
}

result would be of type Option[String], so you might follow that with...

println(s"""It's a ${result.getOrElse("terrible day")}.""")

Now, via your own little "language extension" (that's really just a funny way to call a function), you have a nice little program that makes you happy nine days out of ten.