1
votes

I am not able to perform an implicit conversion from an RDD to a Dataframe in a Scala program although I am importing spark.implicits._.

Any help would be appreciated.

Main Program with the implicits:

object spark1 {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("e1").config("o1", "sv").getOrCreate()

    import spark.implicits._

    val conf = new SparkConf().setMaster("local").setAppName("My App")
    val sc = spark.sparkContext
    val data = sc.textFile("/TestDataB.txt")
    val allSplit = data.map(line => line.split(","))
    case class CC1(LAT: Double, LONG: Double)
    val allData = allSplit.map( p => CC1( p(0).trim.toDouble, p(1).trim.toDouble))
    val allDF = allData.toDF()
    // ... other code
  }

}

Error is as follows:

Error:(40, 25) value toDF is not a member of org.apache.spark.rdd.RDD[CC1] val allDF = allData.toDF()

2
Metropolis, Thanks for the clean-up. Marcuser1154422

2 Answers

1
votes

When you define the case class CC1 inside the main method, you hit https://issues.scala-lang.org/browse/SI-6649; toDF() then fails to locate the appropriate implicit TypeTag for that class at compile time.

You can see this in this simple example:

case class Out()

object TestImplicits {

  def main(args: Array[String]) {
    case class In()
    val typeTagOut = implicitly[TypeTag[Out]] // compiles
    val typeTagIn = implicitly[TypeTag[In]]   // does not compile: Error:(23, 31) No TypeTag available for In
  }

}

Spark's relevant implicit conversion has this type parameter: [T <: Product : TypeTag] (see newProductEncoder here), which means an implicit TypeTag[CC1] is required.

To fix this - simply move the definition of CC1 out of the method, or out of object entirely:

case class CC1(LAT: Double, LONG: Double)

object spark1 {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("e1").config("o1", "sv").getOrCreate()

    import spark.implicits._

    val data = spark.sparkContext.textFile("/TestDataB.txt")
    val allSplit = data.map(line => line.split(","))

    val allData = allSplit.map( p => CC1( p(0).trim.toDouble, p(1).trim.toDouble))
    val allDF = allData.toDF()
    // ... other code
  }

}
0
votes

I thought the toDF is in sqlContext.implicits._ so you need to import that not spark.implicits._. At least that is the case in spark 1.6