1
votes

My objectif is to read Data from a csv file and convert my rdd to dataframe in scala/spark. This is my code :

package xxx.DataScience.CompensationStudy

import org.apache.spark._
import org.apache.log4j._

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext


object CompensationAnalysis {

  case class GetDF(profil_date:String, profil_pays:String, param_tarif2:String, param_tarif3:String, dt_titre:String, dt_langues:String,
    dt_diplomes:String, dt_experience:String, dt_formation:String, dt_outils:String, comp_applications:String, 
    comp_interventions:String, comp_competence:String)

  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("CompensationAnalysis ")
    val sc = new SparkContext(conf)

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._


    val lines = sc.textFile("C:/Users/../Downloads/CompensationStudy.csv").flatMap { l => 


      l.split(",") match {

        case field: Array[String] if field.size > 13 => Some(field(0), field(1), field(2), field(3), field(4), field(5), field(6), field(7), field(8), field(9), field(10), field(11), field(12))

        case field: Array[String] if field.size == 1 => Some((field(0), "default value"))

        case _ => None 
      }


    }

At this stade, I had the error : Product with Serializable does not take parameters

    val summary = lines.collect().map(x => GetDF(x("profil_date"), x("profil_pays"), x("param_tarif2"), x("param_tarif3"), x("dt_titre"), x("dt_langues"), x("dt_diplomes"), x("dt_experience"), x("dt_formation"), x("dt_outils"), x("comp_applications"), x("comp_interventions"), x("comp_competence")))

    val sum_df = summary.toDF()

    df.printSchema


  }

}

This is a screenshot :

enter image description here

Help please ?

1
Your problem is in the flatMap defining lines. The only type the compiler is able to infer is RDD[Product with Serializable], because you have different types in your Options. - Cyrille Corpet

1 Answers

2
votes

You have several things you should improve. The most urgent problem, which causes the exception, is, as @CyrilleCorpet points out, " the three different lines in the pattern matching return values of types Some[Tuple13], Some[Tuple2] and None.type. The least-upper-bound is then Option[Product with Serializable] which complies with flatMap's signature (where the result should be an Iterable[T]) modulo some implicit conversion."

Basically, if you had Some[Tuple13], Some[Tuple13], and None or Some[Tuple2], Some[Tuple2], and None, you would be better off.

Also, pattern matching on types is generally a bad idea because of type erasure, and pattern matching isn't even great anyway for your situation.

So you could set default values in your case class:

case class GetDF(profile_date: String, 
                 profile_pays: String = "default", 
                 param_tarif2: String = "default", 
                 ...
)

Then in your lambda:

val tokens = l.split
if (l.length > 13) {
   Some(GetDf(l(0), l(1), l(2)...))
} else if (l.length == 1) {
   Some(GetDf(l(0)))
} else {
   None
}

Now in all cases you are returning Option[GetDF]. You can flatMap the RDD to get rid of all the Nones and keep only GetDF instances.