1
votes

I need some help to understand iteration in 'scala'. I have a dataframe which have different type of data'(Int,String,Date,Long)'. I want to read each row data in loop. if data of column if column data is not matching with datatype correct datatype. Then i want to update the value of column with null.

I have tired to read each column value and check with respective datatype Like 'ID Column is :Integer type ,AirName:String type ,Place:String type ,TakeoffDate:Date' type.My Input dataset is:

+-----+-------+-----+-----------+
|   ID|AirName|Place|TakeoffDate|
|    1|  Delta|  Aus|    1/11/18|
|    2|  Delta|     |    10/5/19|
|Three|   null|  New| 15/10/2018|
|    4| JetAir|  Aus|    11/6/15|
+-----+-------+-----+-----------+

Here Three is string type but ID we have declared it Int type. So i want to replace Three with null. similar other column in loop.

I had read data from file and created a dataframe. Now i want to check each row and each column with respective datatype. if datatype doesn't match i want to replace that column with null.

But that is not working for me.

val myFile = sc.textFile("/FileStore/tables/Airline__2_-14f6c.txt")

import org.apache.spark.sql.types._
case class Test(ID:Int,AirName:String,Place:String,TakeoffDate:String)
val df= myFile.map(x => x.split(",") ).map( x=> Test(x(0).toInt,x(1),x(2),x(3)) ).toDF()

def isInt(in: Integer): Option[Int] = {
    try {
        Some(Integer.parseInt(in))
    } catch {
        case e: NumberFormatException => None
    }
}

rows.map{
     case(ID) => if (isInt(ID).isEmpty (ID,ErrorCodes.NOT_INT)) 
  {
    println("yes")
  }
  else ((Int)ID, None)
  {
    println("no")
  }
}

Expected Output
+-----+-------+-----+-----------+
|   ID|AirName|Place|TakeoffDate|
|    1|  Delta|  Aus|    1/11/18|
|    2|  Delta| null|    10/5/19|
|null |   null|  New|     null  |
|    4| JetAir|  Aus|    11/6/15|
+-----+-------+-----+-----------+

1

1 Answers

0
votes

Instead of using RDD API I would suggest the dataframe/dataset API which offers a richer functionality:

import org.apache.spark.sql.functions.{trim, length, when}

val df = Seq(
  ("1", "Delta", "Aus", "1/11/18"),
  ("2", "Delta", null, "10/5/19"),
  ("Three", null, "New", "15/10/2018"),
  ("4", "JetAir", "Aus", "11/6/15"))
.toDF("ID", "AirName","Place", "TakeoffDate")

df.withColumn("ID", $"ID".cast("int"))
.withColumn("TakeoffDate", 
            when(
              $"TakeoffDate".rlike("\\d{1,2}/\\d{1,2}/\\d{1,2}$"), $"TakeoffDate")
            .otherwise(null)
           )
.withColumn("Place", 
            when(
                length(trim($"Place")) > 0, $"Place")
            .otherwise(null))
.show(false)

Output:

+----+-------+-----+-----------+
|ID  |AirName|Place|TakeoffDate|
+----+-------+-----+-----------+
|1   |Delta  |Aus  |1/11/18    |
|2   |Delta  |null |10/5/19    |
|null|null   |New  |null       |
|4   |JetAir |Aus  |11/6/15    |
+----+-------+-----+-----------+

Casting

  • $"ID".cast("int"): you cast any string into int. If conversion is not possible cast will return null by default.
  • $"TakeoffDate".rlike("\\d{1,2}/\\d{1,2}/\\d{1,2}$"): the date should have the format specified in this regex. rlike returns true if the string is a match otherwise false.
  • length(trim($"Place")) > 0: when string is empty return null otherwise the value of $"Place".