3
votes

We are using Spark 2.x with Scala for a system that has 13 different ETL operations. 7 of them are relatively simple and each driven by a single domain class, and differ primarily by this class and some nuances in how the load is handled.

A simplified version of the load class is as follows, for the purposes of this example say that there are 7 pizza toppings being loaded, here's Pepperoni:

object LoadPepperoni {
  def apply(inputFile: Dataset[Row],
            historicalData: Dataset[Pepperoni],
            mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = {
    val sparkSession = SparkSession.builder().getOrCreate()
    import sparkSession.implicits._

    val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row =>
      PepperoniRaw(
          weight = row.getAs[String]("weight"),
          cost = row.getAs[String]("cost")
        )
    }.toDS()

    val validatedData: Dataset[PepperoniRaw] = ??? // validate the data

    val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data

    val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw =>
        Pepperoni( value = ???, key1 = ???, key2 = ??? )
    }.toDS()

    val joinedData = dedupedData.joinWith(historicalData,
      historicalData.col("key1") === dedupedData.col("key1") && 
        historicalData.col("key2") === dedupedData.col("key2"),
      "right_outer"
    )

    joinedData.map { case (hist, delta) =>
      if( /* some condition */) {
        hist.copy(value = /* some transformation */)
      }
    }.flatMap(list => list).toDS()
  }
}

In other words the class performs a series of operations on the data, the operations are mostly the same and always in the same order, but can vary slightly per topping, as would the mapping from "raw" to "domain" and the merge function.

To do this for 7 toppings (i.e. Mushroom, Cheese, etc), I would rather not simply copy/paste the class and change all of the names, because the structure and logic is common to all loads. Instead I'd rather define a generic "Load" class with generic types, like this:

object Load {
  def apply[R,D](inputFile: Dataset[Row],
            historicalData: Dataset[D],
            mergeFun: (D, R) => D): Dataset[D] = {
    val sparkSession = SparkSession.builder().getOrCreate()
    import sparkSession.implicits._

    val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row =>
...

And for each class-specific operation such as mapping from "raw" to "domain", or merging, have a trait or abstract class that implements the specifics. This would be a typical dependency injection / polymorphism pattern.

But I'm running into a few problems. As of Spark 2.x, encoders are only provided for native types and case classes, and there is no way to generically identify a class as a case class. So the inferred toDS() and other implicit functionality is not available when using generic types.

Also as mentioned in this related question of mine, the case class copy method is not available when using generics either.

I have looked into other design patterns common with Scala and Haskell such as type classes or ad-hoc polymorphism, but the obstacle is the Spark Dataset basically only working on case classes, which can't be abstractly defined.

It seems that this would be a common problem in Spark systems but I'm unable to find a solution. Any help appreciated.

1

1 Answers

5
votes

The implicit conversion that enables .toDS is:

implicit def rddToDatasetHolder[T](rdd: RDD[T])(implicit arg0: Encoder[T]): DatasetHolder[T]

(from https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLImplicits)

You are exactly correct in that there's no implicit value in scope for Encoder[T] now that you've made your apply method generic, so this conversion can't happen. But you can simply accept one as an implicit parameter!

object Load {
  def apply[R,D](inputFile: Dataset[Row],
            historicalData: Dataset[D],
            mergeFun: (D, R) => D)(implicit enc: Encoder[D]): Dataset[D] = {
...

Then at the time you call the load, with a specific type, it should be able to find an Encoder for that type. Note that you will have to import sparkSession.implicits._ in the calling context as well.

Edit: a similar approach would be to enable the implicit newProductEncoder[T <: Product](implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[T]): Encoder[T] to work by bounding your type (apply[R, D <: Product]) and accepting an implicit JavaUniverse.TypeTag[D] as a parameter.