9
votes

I have the following case class:

case class User(userId: String)

and the following schema:

+--------------------+------------------+
|            col_name|         data_type|
+--------------------+------------------+
|             user_id|            string|
+--------------------+------------------+

When I try to convert a DataFrame to a typed Dataset[User] with spark.read.table("MyTable").as[User], I get an error that the field names mismatch:

Exception in thread "main" org.apache.spark.sql.AnalysisException:
    cannot resolve ''`user_id`' given input columns: [userId];;

Is there any simple way to solve this without breaking scala idioms and naming my fields user_id? Of course, my real table has a lot of more fields, and I have a lot more case classes / tables, so it's not feasible to manually define an Encoder for each case class (And I don't know macros well-enough, so that's out of a question; though I'm happy to use one if such exists!).

I feel like I'm missing a very obvious "convert snake_case to camelCase=true" option, since one exists in practically any ORM I've worked with.

1
I feel like I'm missing a very obvious "convert snake_case to camelCase=true" - you don't. If I am not mistaken, there is some old JIRA ticket which targets something similar, but for now, you have to rename things.zero323
@user6910411 Bummer :( If you answer with the JIRA ticket, I'll accept the answer.Gal
@Gal Three years later, have you found a better solution to this?DanielR
@DanielR unfortunately, no. I resigned to just name my case class fields using snake_case if they represent a spark table.Gal

1 Answers

1
votes
scala> val df = Seq(("Eric" ,"Theodore", "Cartman"), ("Butters", "Leopold", "Stotch")).toDF.select(concat($"_1", lit(" "), ($"_2")) as "first_and_middle_name", $"_3" as "last_name")
df: org.apache.spark.sql.DataFrame = [first_and_middle_name: string, last_name: string]

scala> df.show
+---------------------+---------+
|first_and_middle_name|last_name|
+---------------------+---------+
|        Eric Theodore|  Cartman|
|      Butters Leopold|   Stotch|
+---------------------+---------+


scala> val ccnames = df.columns.map(sc => {val ccn = sc.split("_")
    | (ccn.head +: ccn.tail.map(_.capitalize)).mkString
    | })
ccnames: Array[String] = Array(firstAndMiddleName, lastName)

scala> df.toDF(ccnames: _*).show
+------------------+--------+
|firstAndMiddleName|lastName|
+------------------+--------+
|     Eric Theodore| Cartman|
|   Butters Leopold|  Stotch|
+------------------+--------+

EDIT: Would this help? Defining a single function that takes loader: String => DataFrame and path: String.

scala> val parquetloader = spark.read.parquet _
parquetloader: String => org.apache.spark.sql.DataFrame = <function1>

scala> val tableloader = spark.read.table _
tableloader: String => org.apache.spark.sql.DataFrame = <function1>

scala> val textloader = spark.read.text _
textloader: String => org.apache.spark.sql.DataFrame = <function1>

// csv loader and others

def snakeCaseToCamelCaseDataFrameColumns(path: String, loader: String => DataFrame): DataFrame = {
  val ccnames = loader(path).columns.map(sc => {val ccn = sc.split("_")
    (ccn.head +: ccn.tail.map(_.capitalize)).mkString
    })
  df.toDF(ccnames: _*)
}

scala> :paste
// Entering paste mode (ctrl-D to finish)

def snakeCaseToCamelCaseDataFrameColumns(path: String, loader: String => DataFrame): DataFrame = {
      val ccnames = loader(path).columns.map(sc => {val ccn = sc.split("_")
        (ccn.head +: ccn.tail.map(_.capitalize)).mkString
        })
      df.toDF(ccnames: _*)
    }

// Exiting paste mode, now interpreting.

snakeCaseToCamelCaseDataFrameColumns: (path: String, loader: String => org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

val oneDF = snakeCaseToCamelCaseDataFrameColumns(tableloader("/path/to/table"))
val twoDF = snakeCaseToCamelCaseDataFrameColumns(parquetloader("/path/to/parquet/file"))