2
votes

I have a library in Scala for Spark which contains many functions. One example is the following function to unite two dataframes that have different columns:

def appendDF(df2: DataFrame): DataFrame = {

  val cols1 = df.columns.toSeq
  val cols2 = df2.columns.toSeq

  def expr(sourceCols: Seq[String], targetCols: Seq[String]): Seq[Column] = {
    targetCols.map({
      case x if sourceCols.contains(x) => col(x)
      case y                           => lit(null).as(y)
    })
  }

  // both df's need to pass through `expr` to guarantee the same order, as needed for correct unions.
  df.select(expr(cols1, cols1): _*).union(df2.select(expr(cols2, cols1): _*))

}

I would like to use this function (and many more) to Dataset[CleanRow] and not DataFrames. CleanRow is a simple class here that defines the names and types of the columns. My educated guess is to convert the Dataset into Dataframe using .toDF() method. However, I would like to know whether there are better ways to do it.

From my understanding, there shouldn't be many differences between Dataset and Dataframe since Dataset are just Dataframe[Row]. Plus, I think that from Spark 2.x the APIs for DF and DS have been unified, so I was thinking that I could pass either of them interchangeably, but that's not the case.

1
If the signature of the method cannot be changed (e.g. to accept a generic type), I guess you have to do Dataset.toDF(), otherwise if you are able to change the signature, can you make it def appendDF(ds: DataSet[A]) which can take Dataset[Row] and Dataset[T]? - jack
I see. So that is the only option excluding changing the method signature. Is it considered good practice or not (eg: for a production-level code)? Moreover, about your suggestion on changing the signature, if I change it to Dataset[A] then it can take also as argument somedata.toDF() right? It is merely out of curiosity. - GYBE
Yes, i posted in answer. - jack

1 Answers

1
votes

If changing signature is possible:

import spark.implicits._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset

def f[T](d: Dataset[T]): Dataset[T] = {d}

// You are able to pass a dataframe:
f(Seq(0,1).toDF()).show
// res1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value: int]

// You are also able to pass a dataset:
f(spark.createDataset(Seq(0,1)))
// res2: org.apache.spark.sql.Dataset[Int] = [value: int]