0
votes

I am using Databricks Community Edition with Spark 2.0 preview. I tried the following (simple) code:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import java.util.Calendar
import spark.implicits._

case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5: Double)
val teams = sc.parallelize(Seq(C1("hash1", "NLC", "Cubs", Java.sql.Date.valueOf("2016-01-23"), 3253.21), C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88), C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"), 4322.12),C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"), 10283.72))).toDS

object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]]  {
  def zero: Seq[C1] = Seq.empty[C1] //Nil
  def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a
  def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2
  def finish(r: Seq[C1]): Seq[C1] = r

  override def bufferEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1]
  override def outputEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1]
}
val g_c1 = teams.groupByKey(_.f1).agg[Seq[C1]](C1Agg.toColumn).collect

I get the following error message:

error: type mismatch;
found: org.apache.spark.sql.TypedColumn[C1,Seq[C1]]
required: org.apache.spark.sql.TypedColumn[C1,Seq[C1]]
val g_c1 = teams.groupByKey(_.f1).aggSeq[C1]

When I use

val g_c1 = teams.groupByKey(_.f1).agg(C1Agg.toColumn).collect

I get:

error: type mismatch;
found: org.apache.spark.sql.TypedColumn[C1,Seq[C1]]
required: org.apache.spark.sql.TypedColumn[C1,?]
val g_c1 = teams.groupByKey(_.f1).aggSeq[C1]

Any hints?

1

1 Answers

0
votes

I found the reason: This happens because I am declaring the case class on one cell (of the notebook), then using it in different subsequent cells.

Putting the whole code in the same cell solves this problem. (Unfortunately, though, now I am facing another problem MissingRequirementError)