0
votes

I am trying to move from spark 1.6 to 2.0, I get this error during compilation on 2.0 only:

def getSubGroupCount(df: DataFrame, colNames: String): Array[Seq[Any]] = {
   val columns: Array[String] = colNames.split(',')
   val subGroupCount: Array[Seq[Any]] = columns.map(c => df.select(c).distinct.map(x => x.get(0)).collect.toSeq)
    subGroupCount
  }

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. val subGroupCount: Array[Seq[Any]] = columns.map(c => df.select(c).distinct.map(x => x.get(0)).collect.toSeq)

Regards

1

1 Answers

1
votes

The method DataFrame.map has changed between the versions:

  • In Spark 1.6, it operates on the underlying RDD[Row] and returns an RDD:

    def map[R](f: (Row) ⇒ R)(implicit arg0: ClassTag[R]): RDD[R]
    
  • In Spark 2.0, DataFrame is just an alias for Dataset[Row], and therefore it returns a Dataset:

    def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] 
    

As you can see, the latter expects an implicit Encoder argument, which is missing in your case.

Why is the Encoder missing?

First, all default encoders will be in scope once you import spark.implicits._. However, since the mapping's result type is Any (x => x.get(0) returns Any), you won't have an Encoder for it.

How to fix this?

  1. If there's a common type (say, String, for the sake of example) for all the columns you're interested in, you can use getAs[String](0) to make the mapping function's return type specific. Once the above mentioned import is added, such types (primitives, Products) will have a matching Encoder in scope

  2. If you don't have a known type that is common for all the relevant columns, and want to retain the same behavior - you can get the Dataframe's RDD using .rdd and use that RDD's map operation, which will be identical to the pre-2.0 behavior:

    columns.map(c => df.select(c).distinct.rdd.map(x => x.get(0)).collect.toSeq)