1
votes

I ran a following spark-shell exercise:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala> case class Test(notNullable:String, nullable:Option[String])
defined class Test

scala> val myArray = Array(
     | Test("x", None),
     | Test("y", Some("z"))
     | )
myArray: Array[Test] = Array(Test(x,None), Test(y,Some(z)))

scala> val rdd = sc.parallelize(myArray)
rdd: org.apache.spark.rdd.RDD[Test] = ParallelCollectionRDD[0] at parallelize at <console>:28

scala> rdd.toDF.printSchema
root
 |-- notNullable: string (nullable = true)
 |-- nullable: string (nullable = true)

I've read (Spark in Action) that given a case class with Option fields, those not optional should be inferred as not nullable. Is that even true? If so what am I doing wrong here?

1

1 Answers

3
votes

There are two issues here:

  1. Non-optional fields are inferred as non-nullable only for some types (Int, Long, Short, Double, Float, Byte, Boolean), and String is apparently not one of those; You can see the behavior for Int for example:

    case class Test(notNullable: String,
                nullable: Option[String],
                notNullInt: Int,
                nullableInt: Option[Int])
    
    val myArray = Array(
      Test("x", None, 1, None),
      Test("y", Some("z"), 2, Some(3))
    )
    
    myArray.toSeq.toDF().printSchema
    // root
    //  |-- notNullable: string (nullable = true)
    //  |-- nullable: string (nullable = true)
    //  |-- notNullInt: integer (nullable = false) // !!!
    //  |-- nullableInt: integer (nullable = true)
    

    this can be seen by inspecting the code in org.apache.spark.sql.catalyst.ScalaReflection.schemaFor:

    def schemaFor(tpe: `Type`): Schema = ScalaReflectionLock.synchronized {
      tpe match {
        // ...
        case t if t <:< localTypeOf[String] => Schema(StringType, nullable = true)
        // ...
        case t if t <:< definitions.IntTpe => Schema(IntegerType, nullable = false)
        case t if t <:< definitions.LongTpe => Schema(LongType, nullable = false)
        case t if t <:< definitions.DoubleTpe => Schema(DoubleType, nullable = false)
        case t if t <:< definitions.FloatTpe => Schema(FloatType, nullable = false)
        case t if t <:< definitions.ShortTpe => Schema(ShortType, nullable = false)
        case t if t <:< definitions.ByteTpe => Schema(ByteType, nullable = false)
        case t if t <:< definitions.BooleanTpe => Schema(BooleanType, nullable = false)
        // ...
      }
    }
    
  2. Apparently there's a different code path for inferring schema if you first create an RDD and then convert it into a DF, instead of converting the local collection directly into DF - the two behave differently:

    case class Test(notNullInt: Int, nullableInt: Option[Int])
    
    val myArray = Array(
      Test(1, None),
      Test(2, Some(3))
    )
    
    sc.parallelize(myArray).toDF.printSchema
    // root
    // |-- notNullInt: integer (nullable = true) // NULLABLE TOO!
    // |-- nullableInt: integer (nullable = true)
    
    myArray.toSeq.toDF().printSchema
    // root
    // |-- notNullInt: integer (nullable = false)
    // |-- nullableInt: integer (nullable = true)