I need some help to understand iteration in 'scala'. I have a dataframe which have different type of data'(Int,String,Date,Long)'. I want to read each row data in loop. if data of column if column data is not matching with datatype correct datatype. Then i want to update the value of column with null.
I have tired to read each column value and check with respective datatype Like 'ID Column is :Integer type ,AirName:String type ,Place:String type ,TakeoffDate:Date' type.My Input dataset is:
+-----+-------+-----+-----------+
| ID|AirName|Place|TakeoffDate|
| 1| Delta| Aus| 1/11/18|
| 2| Delta| | 10/5/19|
|Three| null| New| 15/10/2018|
| 4| JetAir| Aus| 11/6/15|
+-----+-------+-----+-----------+
Here Three is string type but ID we have declared it Int type. So i want to replace Three with null. similar other column in loop.
I had read data from file and created a dataframe. Now i want to check each row and each column with respective datatype. if datatype doesn't match i want to replace that column with null.
But that is not working for me.
val myFile = sc.textFile("/FileStore/tables/Airline__2_-14f6c.txt")
import org.apache.spark.sql.types._
case class Test(ID:Int,AirName:String,Place:String,TakeoffDate:String)
val df= myFile.map(x => x.split(",") ).map( x=> Test(x(0).toInt,x(1),x(2),x(3)) ).toDF()
def isInt(in: Integer): Option[Int] = {
try {
Some(Integer.parseInt(in))
} catch {
case e: NumberFormatException => None
}
}
rows.map{
case(ID) => if (isInt(ID).isEmpty (ID,ErrorCodes.NOT_INT))
{
println("yes")
}
else ((Int)ID, None)
{
println("no")
}
}
Expected Output
+-----+-------+-----+-----------+
| ID|AirName|Place|TakeoffDate|
| 1| Delta| Aus| 1/11/18|
| 2| Delta| null| 10/5/19|
|null | null| New| null |
| 4| JetAir| Aus| 11/6/15|
+-----+-------+-----+-----------+