I'm trying to create a trait to load data from hive table into typed Dataset. Here's the code :
import org.apache.spark.sql.{Dataset, Row, SparkSession}
trait PartitionedHiveTableLoader[T] {
def context: String
def table: String
def returnEntity: Row => T
def load(sparkSession: SparkSession, token: String): Dataset[T] = {
import sparkSession.implicits._
sparkSession.sql(s"SELECT * from $context.$table where d = $token").
map(returnEntity(_))
}
def load(sparkSession: SparkSession, lowBound: String, upperBound: String, includeLow: Boolean = true, includeUpper: Boolean = true): Dataset[T] = {
import sparkSession.implicits._
sparkSession.sql(s"SELECT * " +
s"from $context.$table " +
s"where d >${if(includeLow)"=" else ""} $lowBound " +
s"and d<${if(includeUpper)"=" else ""} $upperBound").
map(returnEntity(_))
}
}
Then this trait is used with Object as follow :
import org.apache.spark.sql.Row
object FreeUsersRightsLoader extends {} with PartitionedHiveTableLoader[FreeUsersRightsEntity] {
def context: String = "analytics"
def table: String = "free_users_rights"
def returnEntity: Row => FreeUsersRightsEntity = x => FreeUsersRightsDataset(x)
}
But when I compile it with mvn package, I have the following error :
error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
But I imported spark.implicits in every methods ... Does anybody know what the problem is ?
FreeUsersRightsDataset
definition. There is no way to tell if the problem is scope are class itself without it. – zero323FreeUserRightsDataset
, which is a type that spark does not know how to serialize. Does it implement theSerializable
interface ? That should be enough to make it work, albeit Java serialization won't be the most efficient – Roberto Congiu