0
votes

Would you be able to help in this spark prob statement

Data -

empno|ename|designation|manager|hire_date|sal|deptno    
7369|SMITH|CLERK|9902|2010-12-17|800.00|20
7499|ALLEN|SALESMAN|9698|2011-02-20|1600.00|30

Code:

val rawrdd = spark.sparkContext.textFile("C:\\Users\\cmohamma\\data\\delta scenarios\\emp_20191010.txt")

val refinedRDD = rawrdd.map( lines => {   
val fields = lines.split("\\|")   (fields(0).toInt,fields(1),fields(2),fields(3).toInt,fields(4).toDate,fields(5).toFloat,fields(6).toInt)  
})

Problem Statement - This is not working -fields(4).toDate , whats is the alternative or what is the usage ?

What i have tried ?

  1. tried replacing it to - to_date(col(fields(4)) , "yyy-MM-dd") - Not working

2.

Step 1.

val refinedRDD = rawrdd.map( lines => {   
val fields = lines.split("\\|")    
(fields(0),fields(1),fields(2),fields(3),fields(4),fields(5),fields(6))
})

Now this tuples are all strings

Step 2.

mySchema = StructType(StructField(empno,IntegerType,true), StructField(ename,StringType,true), StructField(designation,StringType,true), StructField(manager,IntegerType,true), StructField(hire_date,DateType,true), StructField(sal,DoubleType,true), StructField(deptno,IntegerType,true))

Step 3. converting the string tuples to Rows

val rowRDD = refinedRDD.map(attributes => Row(attributes._1, attributes._2, attributes._3, attributes._4, attributes._5 , attributes._6, attributes._7))

Step 4.

val empDF = spark.createDataFrame(rowRDD, mySchema)

This is also not working and gives error related to types. to solve this i changed the step 1 as

(fields(0).toInt,fields(1),fields(2),fields(3).toInt,fields(4),fields(5).toFloat,fields(6).toInt)

Now this is giving error for the date type column and i am again at the main problem.

Use Case - use textFile Api, convert this to a dataframe using custom schema (StructType) on top of it.

This can be done using the case class but in case class also i would be stuck where i would need to do a fields(4).toDate (i know i can cast string to date later in code but if the above problem solutionis possible)

2
Why don't you simply read as csv, with inferSchema or providing custom schema? val df = spark.read .option("delimiter", "\\|") .option("header", true) .option("inferSchema", "true") .csv(path) Should be Should be enough to read a dataframe.koiralo
@ShankarKoirala because the file is not a csv, it is a .dat file with delimiter pipe, I am creating custom schema in step 2 and getting error in step 3 when i am converting the tuples to rows to create a dataframe using that rdd of rows. Do you know a way to attach a custom schema to an rdd i order to create a dataframe ?Chand Mohammad

2 Answers

0
votes

You can use the following code snippet

import org.apache.spark.sql.functions.to_timestamp

scala> val df = spark.read.format("csv").option("header", "true").option("delimiter", "|").load("gs://otif-etl-input/test.csv")
df: org.apache.spark.sql.DataFrame = [empno: string, ename: string ... 5 more fields]

scala> val ts = to_timestamp($"hire_date", "yyyy-MM-dd")
ts: org.apache.spark.sql.Column = to_timestamp(`hire_date`, 'yyyy-MM-dd')

scala> val enriched_df = df.withColumn("ts", ts).show(2, false)
+-----+-----+-----------+-------+----------+-------+----------+-------------------+
|empno|ename|designation|manager|hire_date |sal    |deptno    |ts                 |
+-----+-----+-----------+-------+----------+-------+----------+-------------------+
|7369 |SMITH|CLERK      |9902   |2010-12-17|800.00 |20        |2010-12-17 00:00:00|
|7499 |ALLEN|SALESMAN   |9698   |2011-02-20|1600.00|30        |2011-02-20 00:00:00|
+-----+-----+-----------+-------+----------+-------+----------+-------------------+

enriched_df: Unit = ()
0
votes

There are multiple ways to cast your data to proper data types.

First : use InferSchema

val df = spark.read .option("delimiter", "\\|").option("header", true) .option("inferSchema", "true").csv(path)
df.printSchema

Some time it doesn't work as expected. see details here

Second : provide your own Datatype conversion template

val rawDF = Seq(("7369", "SMITH" , "2010-12-17", "800.00"), ("7499", "ALLEN","2011-02-20", "1600.00")).toDF("empno", "ename","hire_date", "sal")
//define schema in DF , hire_date as Date
val schemaDF = Seq(("empno", "INT"), ("ename", "STRING"),  (**"hire_date", "date"**) , ("sal", "double")).toDF("columnName", "columnType")
rawDF.printSchema

enter image description here

    //fetch schema details
    val dataTypes = schemaDF.select("columnName", "columnType")
    val listOfElements = dataTypes.collect.map(_.toSeq.toList)
    //creating a map friendly template
    val validationTemplate = (c: Any, t: Any) => {
       val column = c.asInstanceOf[String]
       val typ = t.asInstanceOf[String]
       col(column).cast(typ)
      }

     //Apply datatype conversion template on rawDF 
    val convertedDF = rawDF.select(listOfElements.map(element => validationTemplate(element(0), element(1))): _*)
    println("Conversion done!")
    convertedDF.show()
    convertedDF.printSchema

enter image description here

Third : Case Class

Create schema from caseclass with ScalaReflection and provide this customized schema while loading DF.

  import org.apache.spark.sql.catalyst.ScalaReflection
  import org.apache.spark.sql.types._

  case class MySchema(empno: int, ename: String, hire_date: Date, sal: Double)

  val schema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType]

  val rawDF = spark.read.schema(schema).option("header", "true").load(path)
  rawDF.printSchema

Hope this will help.