33
votes

I'm manually creating a dataframe for some testing. The code to create it is:

case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
  .createDataFrame(List(input(1110,0,1001,-10.00),
    input(1111,1,1001,10.00),
    input(1111,0,1002,10.00)))

So the schema looks like this:

root
 |-- id: long (nullable = false)
 |-- var1: integer (nullable = false)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

I want to make 'nullable = true' for each one of these variable. How do I declare that from the start or switch it in a new dataframe after it's been created?

6
input should instead be Input - case class names should be capitalized (title case)Shafique Jamal

6 Answers

47
votes

Answer

With the imports

import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

you can use

/**
 * Set nullable property of column.
 * @param df source DataFrame
 * @param cn is the column name to change
 * @param nullable is the flag to set, such that the column is  either nullable or not
 */
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {

  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
    case y: StructField => y
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

directly.

Also you can make the method available via the "pimp my library" library pattern ( see my SO post What is the best way to define custom methods on a DataFrame? ), such that you can call

val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )

Edit

Alternative solution 1

Use a slight modified version of setNullableStateOfColumn

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

Alternative solution 2

Explicitely define the schema. (Use reflection to create a solution that is more general)

configuredUnitTest("Stackoverflow.") { sparkContext =>

  case class Input(id:Long, var1:Int, var2:Int, var3:Double)

  val sqlContext = new SQLContext(sparkContext)
  import sqlContext.implicits._


  // use this to set the schema explicitly or
  // use refelection on the case class member to construct the schema
  val schema = StructType( Seq (
    StructField( "id", LongType, true),
    StructField( "var1", IntegerType, true),
    StructField( "var2", IntegerType, true),
    StructField( "var3", DoubleType, true)
  ))

  val is: List[Input] = List(
    Input(1110, 0, 1001,-10.00),
    Input(1111, 1, 1001, 10.00),
    Input(1111, 0, 1002, 10.00)
  )

  val rdd: RDD[Input] =  sparkContext.parallelize( is )
  val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
  val inputDF = sqlContext.createDataFrame( rowRDD, schema ) 

  inputDF.printSchema
  inputDF.show()
}
27
votes

Another option, if you need to change dataframe in-place, and recreating is impossible, you can do something like this:

.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))

Spark will then think that this column may contain null, and nullability will be set to true. Also, you can use udf, to wrap your values in Option. Works fine even for streaming cases.

17
votes

This is a late answer, but wanted to give an alternative solution for people that come here. You can automatically make a DataFrame Column nullable from the start by the following modification to your code:

case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double)
val inputDF = sqlContext
  .createDataFrame(List(input(Some(1110),Some(0),1001,-10.00),
    input(Some(1111),Some(1),1001,10.00),
    input(Some(1111),Some(0),1002,10.00)))
inputDF.printSchema

This will yield:

root
 |-- id: long (nullable = true)
 |-- var1: integer (nullable = true)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

defined class input
inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]

Essentially, if you declare a field as an Option by using Some([element]) or None as the actual inputs, then that field be nullable. Otherwise, the field will not be nullable. I hope this helps!

10
votes

More compact version of setting all columns nullable parameter

Instead of case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m) one can use _.copy(nullable = nullable). Then the whole function can be written as:

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
  df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable))))
}
3
votes

Just use java.lang.Integer instead of scala.Int in your case class.

case class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)
2
votes

Thanks Martin Senne. Just a little addition. In case of inner struct types, you may need to set nullable recursively, like this:

def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = {
    def set(st: StructType): StructType = {
      StructType(st.map {
        case StructField(name, dataType, _, metadata) =>
          val newDataType = dataType match {
            case t: StructType => set(t)
            case _ => dataType
          }
          StructField(name, newDataType, nullable = nullable, metadata)
      })
    }

    df.sqlContext.createDataFrame(df.rdd, set(df.schema))
  }