3
votes

I'm trying to compute the largest value of the following DataFrame in Spark 1.6.1:

val df = sc.parallelize(Seq(1,2,3)).toDF("id")

A first approach would be to select the maximum value, and it works as expected:

df.select(max($"id")).show

The second approach could be to use withColumn as follows:

df.withColumn("max", max($"id")).show

But unfortunately it fails with the following error message:

org.apache.spark.sql.AnalysisException: expression 'id' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;

How can I compute the maximum value in a withColumn function without any Window or groupBy? If not possible, how can I do it in this specific case using a Window?

3
How would you write what you are trying to achieve in plain-old SQL ? If there is a way, it's close to that. (I personnaly do not see intruitively how one cas express a max without some kind of aggregation first, either group by - wich is what your second attempt expects, or subquery - which is what your first working sample does), so I guess the error is only natural if you think in terms of SQL. - GPI

3 Answers

2
votes

The right approach is to compute an aggregate as a separate query and combine with the actual result. Unlike window functions, suggested in many answers here, it won't require shuffle to a single partition and will be applicable to large datasets.

It could be done withColumn using a separate action:

import org.apache.spark.sql.functions.{lit, max}

df.withColumn("max", lit(df.agg(max($"id")).as[Int].first))

but it is much cleaner to use either explicit:

import org.apache.spark.sql.functions.broadcast

df.crossJoin(broadcast(df.agg(max($"id") as "max")))

or implicit cross join:

spark.conf.set("spark.sql.crossJoin.enabled", true)

df.join(broadcast(df.agg(max($"id") as "max")))
-1
votes

There are few categories of functions in Apache Spark.

  • Aggregate functions, e.g. max, when we wanna aggregate multiple rows in to one
  • None-aggregate functions, abs, isnull, when we wanna transform one column to another
  • Collection functions, e.g. explode, when one row will expand to multiple rows.

Implicit aggregation

They are used to when we wanna aggregate more rows in to one.

The following code internally has an aggregation.

df.select(max($"id")).explain

== Physical Plan ==
*HashAggregate(keys=[], functions=[max(id#3)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_max(id#3)])
      +- *Project [value#1 AS id#3]
         +- Scan ExistingRDD[value#1]

we can also using multiple aggregation functions in a select.

df.select(max($"id"), min($"id")).explain

aggregate functions can not mix with none-aggregate functions directly

The following code will report error.

df.select(max($"id"), $"id")

df.withColumn("max", max($"id"))

Because max($"id") has few values then $"id"

aggregate with over

In this case the analytic function is applied and presented for all rows in the result set.

We can use

df.select(max($"id").over, $"id").show

Or

df.withColumn("max", max($"id").over).show
-1
votes

This is Spark 2.0 here.

With withColumn and window functions it could be as follows:

df.withColumn("max", max('id) over)

Note the empty over which is to assume a "empty" window (and is equivalent of over ()).

If you however need a more complete WindowSpec you can do the following (again, this is 2.0):

import org.apache.spark.sql.expressions._
// the trick that has performance cost (!)
val window = Window.orderBy()
df.withColumn("max", max('id) over window).show

Please note that the code has a serious performance issue as reported by Spark itself:

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.