1
votes

I'm trying to create a trait to load data from hive table into typed Dataset. Here's the code :

import org.apache.spark.sql.{Dataset, Row, SparkSession}

trait PartitionedHiveTableLoader[T] {
  def context: String
  def table: String
  def returnEntity: Row => T
  def load(sparkSession: SparkSession, token: String): Dataset[T] = {
    import sparkSession.implicits._
    sparkSession.sql(s"SELECT * from $context.$table where d = $token").
      map(returnEntity(_))
  }
  def load(sparkSession: SparkSession, lowBound: String, upperBound: String, includeLow: Boolean = true, includeUpper: Boolean = true): Dataset[T] = {
    import sparkSession.implicits._
    sparkSession.sql(s"SELECT * " +
      s"from $context.$table " +
      s"where d >${if(includeLow)"=" else ""} $lowBound " +
      s"and d<${if(includeUpper)"=" else ""} $upperBound").
      map(returnEntity(_))
  }
}

Then this trait is used with Object as follow :

import org.apache.spark.sql.Row

object FreeUsersRightsLoader extends {} with PartitionedHiveTableLoader[FreeUsersRightsEntity] {
  def context: String = "analytics"
  def table: String =  "free_users_rights"
  def returnEntity: Row => FreeUsersRightsEntity = x => FreeUsersRightsDataset(x)
}

But when I compile it with mvn package, I have the following error :

error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

But I imported spark.implicits in every methods ... Does anybody know what the problem is ?

1
Please provide FreeUsersRightsDataset definition. There is no way to tell if the problem is scope are class itself without it.zero323
Actually, the problem is exactly FreeUserRightsDataset, which is a type that spark does not know how to serialize. Does it implement the Serializable interface ? That should be enough to make it work, albeit Java serialization won't be the most efficientRoberto Congiu

1 Answers

3
votes

The implicit Encoder for type T must be available at compile time for the methods you're using.

When you import import sparkSession.implicits._, you actually import a bunch of built-in encoders for many known common types (e.g. String, Long, Arrays, case classes etc.), but - T is unknown and unbound, so it could be anything, and not any class has a built-in encoder - hence this import isn't useful.

To fix that - you should add implicit Encoder arguments to the method signatures:

def load(sparkSession: SparkSession, token: String)(implicit enc: Encoder[T]): Dataset[T] = {
  sparkSession.sql(s"SELECT * from $context.$table where d = $token").
    map(returnEntity(_))
}

def load(sparkSession: SparkSession,
         lowBound: String,
         upperBound: String,
         includeLow: Boolean = true,
         includeUpper: Boolean = true)(implicit enc: Encoder[T]): Dataset[T] = {
  sparkSession.sql(s"SELECT * " +
    s"from $context.$table " +
    s"where d >${if(includeLow)"=" else ""} $lowBound " +
    s"and d<${if(includeUpper)"=" else ""} $upperBound").
    map(returnEntity(_))
}

Then, you'll need the built-in implicit available wherever these methods are called - where the type T is known to be FreeUsersRightsEntity (which I assume is one of these built-in classes, e.g. a case class containing primitives and collections):

import spark.implicits._

FreeUsersRightsLoader.load(spark, "token")