3
votes

I'm trying to update my app from Spark 1.6.2 to 2.0.0, my problem is to create a Dataset from Dataframe (parquet that i read).

I know that i can use case class or tuple to type the Dataframe then have a Dataset but before the runtime i don't know which data will load the user, so the type of column and number of them.

To load data i read data from parquet with a SparkSession, simple like :

spark.read.schema(schema).parquet(dataPath)

schemaOfData is a StructType instantiate by an List[Map[String, String]] that contains the name of the column and his type (which is else String else Double).

I found this on StackOverflow but i struggle to understand it and guest if there isn't an easier way to solve my problem: Dynamically compiling scala class files at runtime in Scala 2.11

Thanks

1
From what i read the performance should be superior then the use of map without of changing all the legacy code made in my app, check this: stackoverflow.com/questions/39433419/…Yanmanlik
Coding type-generic logic in Scala requiring to obtain the type of something at run-time can be quite cumbersome. Spark by-pass this using expressions, enabling a SQL-like behaviour for column types (which are therefore checked at run-time). This is why you probably want to leverage the type genericity of DataFrames, or the flexibility of Row to handle that, rather than to have to do everything yourself. Or?Wilmerton

1 Answers

0
votes

Create implicit conversion from spark datatypes to Scala native data types.

Then map that type to schema with StructFields of Spark DataFrame

  import org.apache.spark.sql.SparkSession
  import org.apache.spark.sql.types._


    val spark = SparkSession
      .builder
      .appName("Movies Reviews")
      .config("spark.master", "local")
      .getOrCreate()

    import spark.implicits._
    val someDF = Seq(
      (8, "bat"),
      (64, "mouse"),
      (-27, "horse")
    ).toDF("number", "word")

    someDF.printSchema()

    def schemaCaseClass(schema:StructType, className:String)
                       (implicit sparkTypeScala:DataType => String):String = {
      def structField(col:StructField):String = {
        val sparkTypes = sparkTypeScala(col.dataType)
        col match {
          case x if x.nullable => s"  ${col.name}:Option[$sparkTypes]"
          case _ => s"  ${col.name}:$sparkTypes"
        }
      }

    val fieldsName = schema.map(structField).mkString(",\n  ")
    s"""
       |case class $className (
       |  $fieldsName
       |)
    """.stripMargin
    }

    implicit val scalaTypes:DataType => String = {
        case _: ByteType => "Byte"
        case _: ShortType => "Short"
        case _: IntegerType => "Int"
        case _: LongType => "Long"
        case _: FloatType => "Float"
        case _: DoubleType => "Double"
        case _: DecimalType => "java.math.BigDecimal"
        case _: StringType => "String"
        case _: BinaryType => "Array[Byte]"
        case _: BooleanType => "Boolean"
        case _: TimestampType => "java.sql.Timestamp"
        case _: DateType => "java.sql.Date"
        case _: ArrayType => "scala.collection.Seq"
        case _: MapType => "scala.collection.Map"
        case _: StructType => "org.apache.spark.sql.Row"
        case _ => "String"
    }


    println(schemaCaseClass(someDF.schema, "someDF"))