4
votes

Is there a way to cast all the values of a dataframe using a StructType ?

Let me explain my question using an example :

Let's say that we obtained a dataframe after reading from a file(I am providing a code which generates this dataframe, but in my real world project, I am obtaining this dataframe after reading from a file):

    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    import spark.implicits._
    val rows1 = Seq(
      Row("1", Row("a", "b"), "8.00", Row("1","2")),
      Row("2", Row("c", "d"), "9.00", Row("3","4"))
    )

    val rows1Rdd = spark.sparkContext.parallelize(rows1, 4)

    val schema1 = StructType(
      Seq(
        StructField("id", StringType, true),
        StructField("s1", StructType(
          Seq(
            StructField("x", StringType, true),
            StructField("y", StringType, true)
          )
        ), true),
        StructField("d", StringType, true),
        StructField("s2", StructType(
          Seq(
            StructField("u", StringType, true),
            StructField("v", StringType, true)
          )
        ), true)
      )
    )

    val df1 = spark.createDataFrame(rows1Rdd, schema1)

    println("Schema with nested struct")
    df1.printSchema()

    root
    |-- id: string (nullable = true)
    |-- s1: struct (nullable = true)
    |    |-- x: string (nullable = true)
    |    |-- y: string (nullable = true)
    |-- d: string (nullable = true)
    |-- s2: struct (nullable = true)
    |    |-- u: string (nullable = true)
    |    |-- v: string (nullable = true)

Now let's say that my client provided me the schema of the data he wants (which is equivalent to the schema of the read dataframe, but with different Datatypes (contains StringTypes, IntegerTypes ...)):

    val wantedSchema = StructType(
      Seq(
        StructField("id", IntegerType, true),
        StructField("s1", StructType(
          Seq(
            StructField("x", StringType, true),
            StructField("y", StringType, true)
          )
        ), true),
        StructField("d", DoubleType, true),
        StructField("s2", StructType(
          Seq(
            StructField("u", IntegerType, true),
            StructField("v", IntegerType, true)
          )
        ), true)
      )
    )

What's the best way to cast the dataframe's values using the provided StructType ?

It would be great if there's a method that we can apply on a dataframe, and it applies the new StructTypes by casting all the values by itself.

PS: This is a small Dataframe which is used as an example, in my project the dataframe contains much more rows. If It was a small Dataframe with few columns, I could have done the cast easily, but in my case, I am looking for a smart solution to cast all the values by applying a StructType and without having to cast each column/value manually in the code.

i will be grateful for any help you can provide, Thanks a lot !

2

2 Answers

3
votes

After a lot of researches, here's a generic solution to cast a dataframe following a schema :

val castedDf = df1.selectExpr(wantedSchema.map(
  field => s"CAST ( ${field.name} As ${field.dataType.sql}) ${field.name}"
): _*)

Here's the schema of the casted dataframe :

castedDf.printSchema
root
|-- id: integer (nullable = true)
|-- s1: struct (nullable = true)
|    |-- x: string (nullable = true)
|    |-- y: string (nullable = true)
|-- d: double (nullable = true)
|-- s2: struct (nullable = true)
|    |-- u: integer (nullable = true)
|    |-- v: integer (nullable = true)

I hope it's going to help someone, I spent 5 days looking for this simple/generic solution.

0
votes

There's no automatic way to perform the conversion. You can express the conversion logic in Spark SQL, to convert everything in one pass - the resulting SQL might get quite big, though, if you have a lot of fields. But at least you get to keep all your transformation in one place.

Example:

   df1.selectExpr("CAST (id AS INTEGER) as id",
    "STRUCT (s1.x, s1.y) AS s1",
    "CAST (d AS DECIMAL) as d",
    "STRUCT (CAST (s2.u AS INTEGER), CAST (s2.v AS INTEGER)) as s2").show()

One thing to watch out for is that whenever conversion fails (e.g., when d is not a number), you'll get a NULL. One option is to run some validation prior to the conversion, and then filter out the df1 records to only convert the valid ones.