0
votes

I use Scala 2.11.8 and Spark 2.0.1 for running my codes. In this line of code :

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object training {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test").setMaster("local[4]") 
    val sc = new SparkContext(conf)
    val data = sc.textFile("/home/ahoora/data.csv")

    // create RDDs from data in form (user, product, price)
    data.map(line => line.split(",")).map(purchaserecord => (purchaserecord(0), purchaserecord(1), purchaserecord(2)))

    // Our total revenue
    val totalRevenue = data.map{ case (user, product, price) => price.toDouble}.sum()
    sc.stop()
  }
}

This code is based on Scala 2.10.x and Spark 1.X, I think the error is because of versioning. I read data from a .csv file split it in (user, product, price) as an RDD. All things were correct but here the method toDouble is not resolved. what is correct way to do that ?

Data are in form

ali, food, 123

in .csv file

1
Did you recompile the code? If you use code compiled in scala 2.10.x it will cause issues. Also, what is the type of price? If you read it from somewhere it may have been converted to an unsupported type. For example if price is string which is not a double it might cause issues. Also if string is a null this may cause similar issues. What is your exact error message? - Assaf Mendelson
Please add the error you have and sufficient code to reproduce your issue. - maasg

1 Answers

1
votes

You're using the original data (which has type RDD[String]) and not the modified RDD created by splitting the string into 3-tuples (which has the type RDD[(String, String, String)]). RDDs are immutable, if you want to use the result of a transformation (like map) you need to use the value returned from that transformation:

val data = sc.textFile("/home/ahoora/data.csv")

// create RDDs from data in form (user, product, price)
val split = data.map(line => line.split(",")).map(purchaserecord => (purchaserecord(0), purchaserecord(1), purchaserecord(2)))

// Our total revenue
val totalRevenue = split.map{ case (user, product, price) => price.toDouble}.sum()