1
votes

I am beginner learning spark with scala .pardon for my broken english...I need to write a program to parse delimited and fixed width file into Dataframe using spark-scala Dataframe Api.Also if input data is corrupted then program must handle in below given way:

A:ignoring the input data
B:investigate the error in input
C:stop on error

To accomplish the above goal , i have succesfully done parsing with exception handling for delimited file using DataFrame Api options. But i dont have idea how to apply same technique for fixed width file. I am using Spark 2.4.3 version.

// predefined schema used in program
val schema = new StructType()
.add("empno",IntegerType,true)
.add("ename",StringType,true)
.add("designation",StringType,true)
.add("manager",StringType,true)
.add("hire_date",StringType,true)
.add("salary",DoubleType,true)
.add("deptno",IntegerType,true)
.add("_corrupt_record", StringType, true)

// parse csv file into DataFrame Api
// option("mode","PERMISSIVE") used to handle corrupt record
val textDF =sqlContext.read.format("csv").option("header", "true").schema(schema).option("mode", "PERMISSIVE").load("empdata.csv")
textDF.show

// program for fixed width line

// created lsplit method to split line into list of tokens based on width input / string

def lsplit(pos: List[Int], str: String): List[String] = {
val (rest, result) = pos.foldLeft((str, List[String]())) {
case ((s, res),curr) =>
    if(s.length()<=curr)
    {
    val split=s.substring(0).trim()
    val rest=""
    (rest, split :: res)
    }
    else if(s.length()>curr)
    {
    val split=s.substring(0, curr).trim()
    val rest=s.substring(curr)
    (rest, split :: res)
    }
    else
    {
    val split=""
    val rest=""
    (rest, split :: res)
    }
}
// list is reversed
result.reverse
}
// create case class to hold parsed data
case class EMP(empno:Int,ename:String,designation:String,manager:String,hire_dt:String,salary:Double,deptno:Int)


// create variable to hold width length
val sizeOfColumn=List(4,4,5,4,10,8,2);

// code to transform string to case class record
val ttRdd=textDF.map { 
    x => 
    val row=lsplit(sizeOfColumn,x.mkString) 
    EMP(row(0).toInt,row(1),row(2),row(3),row(4).toDouble,row(5).toInt)
}


Code works fine for proper data but fails if incorrect data comes in file.
for e.g: "empno" column has some non-integer data..program throws exception NumberFormatException..
The program must handle if actual data in file does not match the specified schema as handled in delimited file.

Kindly help me here . I need to use same method for fixed width file as used for delimited file.

1
So if length is incorrect an error must be generated? - thebluephantom
what does B mean? - thebluephantom
i mean if actual data in input file does not match schema ...as we can see i am converting data to double for one field but program fail if non numeric character come as input..same is handled in delimited case by Dataframe Api using mode="PERMISSIVE"...but i dont find same for fixed width file.. - macmore
B means corrupt record get copied to _corrupt_column...so i can investigate why error took place at first place.. - macmore
but that should happen, interesting - thebluephantom

1 Answers

0
votes

It's sort of obvious.

You are blending your own approach with the API "permissive" option.

The permissive will pick up errors such as wrong data type. Then your own process lsplit still executes and can get a null exception. E.g. If I put in empnum "YYY" this is clearly observable.

If the datatype is OK and the length wrong, you process in most cases correctly, but the fields are garbled.

Your lsplit needs to be more robust and you need to check for if an error exists in there or prior to invoking not invoking.

First case

+-----+-----+---------------+
|empno|ename|_corrupt_record|
+-----+-----+---------------+
| null| null|      YYY,Gerry|
| 5555|Wayne|           null|
+-----+-----+---------------+

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 1 times, most recent failure: Lost task 0.0 in stage 30.0 (TID 30, localhost, executor driver): java.lang.NumberFormatException: For input string: "null"

Second case

+------+-----+---------------+
| empno|ename|_corrupt_record|
+------+-----+---------------+
|444444|Gerry|           null|
|  5555|Wayne|           null|
+------+-----+---------------+

res37: Array[EMP] = Array(EMP(4444,44Ger), EMP(5555,Wayne))

In short, some work to do and no need for a header in fact.